mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 12:18:16 +00:00 
			
		
		
		
	Merge pull request #30090 from mtaufen/dynamic-kubelet-restart
Automatic merge from submit-queue [Kubelet] Optionally consume configuration from <node-name> named config maps This extends the Kubelet to check the API server for new node-specific config, and exit when it finds said new config. /cc @kubernetes/sig-node @mikedanese @timstclair @vishh **Release note**: ``` Extends Kubelet with Alpha Dynamic Kubelet Configuration. Please note that this alpha feature does not currently work with cloud provider auto-detection. ```
This commit is contained in:
		@@ -40,7 +40,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/resource"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/apis/componentconfig"
 | 
			
		||||
	kubeExternal "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
 | 
			
		||||
	v1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/capabilities"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/client/chaosclient"
 | 
			
		||||
	clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
 | 
			
		||||
@@ -64,6 +64,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/network"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/server"
 | 
			
		||||
	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/runtime"
 | 
			
		||||
	utilconfig "k8s.io/kubernetes/pkg/util/config"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/configz"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/crypto"
 | 
			
		||||
@@ -73,7 +74,7 @@ import (
 | 
			
		||||
	nodeutil "k8s.io/kubernetes/pkg/util/node"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/oom"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/rlimit"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/runtime"
 | 
			
		||||
	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/wait"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/version"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume"
 | 
			
		||||
@@ -306,6 +307,117 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getKubeClient(s *options.KubeletServer) (*clientset.Clientset, error) {
 | 
			
		||||
	clientConfig, err := CreateAPIServerClientConfig(s)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		kubeClient, err := clientset.NewForConfig(clientConfig)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		return kubeClient, nil
 | 
			
		||||
	}
 | 
			
		||||
	return nil, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Tries to download the kubelet-<node-name> configmap from "kube-system" namespace via the API server and returns a JSON string or error
 | 
			
		||||
func getRemoteKubeletConfig(s *options.KubeletServer, kcfg *KubeletConfig) (string, error) {
 | 
			
		||||
	// TODO(mtaufen): should probably cache clientset and pass into this function rather than regenerate on every request
 | 
			
		||||
	kubeClient, err := getKubeClient(s)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return "", err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	configmap, err := func() (*api.ConfigMap, error) {
 | 
			
		||||
		var nodename string
 | 
			
		||||
		hostname := nodeutil.GetHostname(s.HostnameOverride)
 | 
			
		||||
 | 
			
		||||
		if kcfg != nil && kcfg.Cloud != nil {
 | 
			
		||||
			instances, ok := kcfg.Cloud.Instances()
 | 
			
		||||
			if !ok {
 | 
			
		||||
				err = fmt.Errorf("failed to get instances from cloud provider, can't determine nodename.")
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			nodename, err = instances.CurrentNodeName(hostname)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				err = fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			// look for kubelet-<node-name> configmap from "kube-system"
 | 
			
		||||
			configmap, err := kubeClient.CoreClient.ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", nodename))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			return configmap, nil
 | 
			
		||||
		}
 | 
			
		||||
		// No cloud provider yet, so can't get the nodename via Cloud.Instances().CurrentNodeName(hostname), try just using the hostname
 | 
			
		||||
		configmap, err := kubeClient.CoreClient.ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", hostname))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("cloud provider was nil, and attempt to use hostname to find config resulted in: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		return configmap, nil
 | 
			
		||||
	}()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return "", err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// When we create the KubeletConfiguration configmap, we put a json string
 | 
			
		||||
	// representation of the config in a `kubelet.config` key.
 | 
			
		||||
	jsonstr, ok := configmap.Data["kubelet.config"]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return "", fmt.Errorf("KubeletConfiguration configmap did not contain a value with key `kubelet.config`")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return jsonstr, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func startKubeletConfigSyncLoop(s *options.KubeletServer, currentKC string) {
 | 
			
		||||
	glog.Infof("Starting Kubelet configuration sync loop")
 | 
			
		||||
	go func() {
 | 
			
		||||
		wait.PollInfinite(30*time.Second, func() (bool, error) {
 | 
			
		||||
			glog.Infof("Checking API server for new Kubelet configuration.")
 | 
			
		||||
			remoteKC, err := getRemoteKubeletConfig(s, nil)
 | 
			
		||||
			if err == nil {
 | 
			
		||||
				// Detect new config by comparing with the last JSON string we extracted.
 | 
			
		||||
				if remoteKC != currentKC {
 | 
			
		||||
					glog.Info("Found new Kubelet configuration via API server, restarting!")
 | 
			
		||||
					os.Exit(0)
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				glog.Infof("Did not find a configuration for this Kubelet via API server: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			return false, nil // Always return (false, nil) so we poll forever.
 | 
			
		||||
		})
 | 
			
		||||
	}()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Try to check for config on the API server, return that config if we get it, and start
 | 
			
		||||
// a background thread that checks for updates to configs.
 | 
			
		||||
func initKubeletConfigSync(s *options.KubeletServer) (*componentconfig.KubeletConfiguration, error) {
 | 
			
		||||
	jsonstr, err := getRemoteKubeletConfig(s, nil)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		// We will compare future API server config against the config we just got (jsonstr):
 | 
			
		||||
		startKubeletConfigSyncLoop(s, jsonstr)
 | 
			
		||||
 | 
			
		||||
		// Convert json from API server to external type struct, and convert that to internal type struct
 | 
			
		||||
		extKC := v1alpha1.KubeletConfiguration{}
 | 
			
		||||
		err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(jsonstr), &extKC)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		kc := componentconfig.KubeletConfiguration{}
 | 
			
		||||
		err = api.Scheme.Convert(&extKC, &kc, nil)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		return &kc, nil
 | 
			
		||||
	} else {
 | 
			
		||||
		// Couldn't get a configuration from the API server yet.
 | 
			
		||||
		// Restart as soon as anything comes back from the API server.
 | 
			
		||||
		startKubeletConfigSyncLoop(s, "")
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Run runs the specified KubeletServer for the given KubeletConfig.  This should never exit.
 | 
			
		||||
// The kcfg argument may be nil - if so, it is initialized from the settings on KubeletServer.
 | 
			
		||||
// Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults
 | 
			
		||||
@@ -326,6 +438,22 @@ func checkPermissions() error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func setConfigz(cz *configz.Config, kc *componentconfig.KubeletConfiguration) {
 | 
			
		||||
	tmp := v1alpha1.KubeletConfiguration{}
 | 
			
		||||
	api.Scheme.Convert(kc, &tmp, nil)
 | 
			
		||||
	cz.Set(tmp)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initConfigz(kc *componentconfig.KubeletConfiguration) (*configz.Config, error) {
 | 
			
		||||
	cz, err := configz.New("componentconfig")
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		setConfigz(cz, kc)
 | 
			
		||||
	} else {
 | 
			
		||||
		glog.Errorf("unable to register configz: %s", err)
 | 
			
		||||
	}
 | 
			
		||||
	return cz, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
 | 
			
		||||
	if s.ExitOnLockContention && s.LockFilePath == "" {
 | 
			
		||||
		return errors.New("cannot exit on lock file contention: no lock file specified")
 | 
			
		||||
@@ -344,18 +472,38 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if c, err := configz.New("componentconfig"); err == nil {
 | 
			
		||||
		c.Set(s.KubeletConfiguration)
 | 
			
		||||
	} else {
 | 
			
		||||
		glog.Errorf("unable to register configz: %s", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Register current configuration with /configz endpoint
 | 
			
		||||
	cfgz, cfgzErr := initConfigz(&s.KubeletConfiguration)
 | 
			
		||||
 | 
			
		||||
	if kcfg == nil {
 | 
			
		||||
		if utilconfig.DefaultFeatureGate.DynamicKubeletConfig() {
 | 
			
		||||
			// Look for config on the API server. If it exists, replace s.KubeletConfiguration
 | 
			
		||||
			// with it and continue. initKubeletConfigSync also starts the background thread that checks for new config.
 | 
			
		||||
 | 
			
		||||
			// Don't do dynamic Kubelet configuration in runonce mode
 | 
			
		||||
			if s.RunOnce == false {
 | 
			
		||||
				// For now we only do dynamic config when kcfg is passed as nil, because we don't want to disturb
 | 
			
		||||
				// any special relationship between the values in s (KubeletServer) and kcfg (KubeletConfig).
 | 
			
		||||
				remoteKC, err := initKubeletConfigSync(s)
 | 
			
		||||
				if err == nil {
 | 
			
		||||
					// Update s (KubeletServer) with new config from API server
 | 
			
		||||
					s.KubeletConfiguration = *remoteKC
 | 
			
		||||
					// Ensure that /configz is up to date with the new config
 | 
			
		||||
					if cfgzErr != nil {
 | 
			
		||||
						glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr)
 | 
			
		||||
					} else {
 | 
			
		||||
						setConfigz(cfgz, &s.KubeletConfiguration)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		var kubeClient, eventClient *clientset.Clientset
 | 
			
		||||
		var autoDetectCloudProvider bool
 | 
			
		||||
		var cloud cloudprovider.Interface
 | 
			
		||||
 | 
			
		||||
		if s.CloudProvider == kubeExternal.AutoDetectCloudProvider {
 | 
			
		||||
		if s.CloudProvider == v1alpha1.AutoDetectCloudProvider {
 | 
			
		||||
			autoDetectCloudProvider = true
 | 
			
		||||
		} else {
 | 
			
		||||
			cloud, err = cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
 | 
			
		||||
@@ -440,7 +588,8 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
 | 
			
		||||
		glog.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	runtime.ReallyCrash = s.ReallyCrashForTesting
 | 
			
		||||
	utilruntime.ReallyCrash = s.ReallyCrashForTesting
 | 
			
		||||
 | 
			
		||||
	rand.Seed(time.Now().UTC().UnixNano())
 | 
			
		||||
 | 
			
		||||
	// TODO(vmarmol): Do this through container config.
 | 
			
		||||
 
 | 
			
		||||
@@ -38,14 +38,16 @@ const (
 | 
			
		||||
	// specification of gates. Examples:
 | 
			
		||||
	//   AllAlpha=false,NewFeature=true  will result in newFeature=true
 | 
			
		||||
	//   AllAlpha=true,NewFeature=false  will result in newFeature=false
 | 
			
		||||
	allAlphaGate = "AllAlpha"
 | 
			
		||||
	allAlphaGate         = "AllAlpha"
 | 
			
		||||
	dynamicKubeletConfig = "DynamicKubeletConfig"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	// Default values for recorded features.  Every new feature gate should be
 | 
			
		||||
	// represented here.
 | 
			
		||||
	knownFeatures = map[string]featureSpec{
 | 
			
		||||
		allAlphaGate: {false, alpha},
 | 
			
		||||
		allAlphaGate:         {false, alpha},
 | 
			
		||||
		dynamicKubeletConfig: {false, alpha},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Special handling for a few gates.
 | 
			
		||||
@@ -86,6 +88,7 @@ type FeatureGate interface {
 | 
			
		||||
	// MyFeature() bool
 | 
			
		||||
 | 
			
		||||
	// TODO: Define accessors for each non-API alpha feature.
 | 
			
		||||
	DynamicKubeletConfig() bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// featureGate implements FeatureGate as well as pflag.Value for flag parsing.
 | 
			
		||||
@@ -154,6 +157,11 @@ func (f *featureGate) Type() string {
 | 
			
		||||
	return "mapStringBool"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DynamicKubeletConfig returns value for dynamicKubeletConfig
 | 
			
		||||
func (f *featureGate) DynamicKubeletConfig() bool {
 | 
			
		||||
	return f.lookup(dynamicKubeletConfig)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *featureGate) lookup(key string) bool {
 | 
			
		||||
	defaultValue := f.known[key].enabled
 | 
			
		||||
	if f.enabled != nil {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										195
									
								
								test/e2e_node/dynamic_kubelet_configuration_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										195
									
								
								test/e2e_node/dynamic_kubelet_configuration_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,195 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package e2e_node
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/apis/componentconfig"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
 | 
			
		||||
	"k8s.io/kubernetes/test/e2e/framework"
 | 
			
		||||
 | 
			
		||||
	. "github.com/onsi/ginkgo"
 | 
			
		||||
	. "github.com/onsi/gomega"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// This test is marked [Disruptive] because the Kubelet temporarily goes down as part of of this test.
 | 
			
		||||
var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:dynamicKubeletConfig] [Disruptive]", func() {
 | 
			
		||||
	f := framework.NewDefaultFramework("dynamic-kubelet-configuration-test")
 | 
			
		||||
 | 
			
		||||
	Context("When a configmap called `kubelet-<node-name>` is added to the `kube-system` namespace", func() {
 | 
			
		||||
		It("The Kubelet on that node should restart to take up the new config", func() {
 | 
			
		||||
			const (
 | 
			
		||||
				restartGap = 40 * time.Second
 | 
			
		||||
			)
 | 
			
		||||
 | 
			
		||||
			// Get the current KubeletConfiguration (known to be valid) by
 | 
			
		||||
			// querying the configz endpoint for the current node.
 | 
			
		||||
			resp := pollConfigz(2*time.Minute, 5*time.Second)
 | 
			
		||||
			kubeCfg, err := decodeConfigz(resp)
 | 
			
		||||
			framework.ExpectNoError(err)
 | 
			
		||||
			glog.Infof("KubeletConfiguration - Initial values: %+v", *kubeCfg)
 | 
			
		||||
 | 
			
		||||
			// Change a safe value e.g. file check frequency.
 | 
			
		||||
			// Make sure we're providing a value distinct from the current one.
 | 
			
		||||
			oldFileCheckFrequency := kubeCfg.FileCheckFrequency.Duration
 | 
			
		||||
			newFileCheckFrequency := 11 * time.Second
 | 
			
		||||
			if kubeCfg.FileCheckFrequency.Duration == newFileCheckFrequency {
 | 
			
		||||
				newFileCheckFrequency = 10 * time.Second
 | 
			
		||||
			}
 | 
			
		||||
			kubeCfg.FileCheckFrequency.Duration = newFileCheckFrequency
 | 
			
		||||
 | 
			
		||||
			// Use the new config to create a new kube-<node-name> configmap in `kube-system` namespace.
 | 
			
		||||
			_, err = createConfigMap(f, kubeCfg)
 | 
			
		||||
			framework.ExpectNoError(err)
 | 
			
		||||
 | 
			
		||||
			// Give the Kubelet time to see that there is new config and restart. If we don't do this,
 | 
			
		||||
			// the Kubelet will still have the old config when we poll, and the test will fail.
 | 
			
		||||
			time.Sleep(restartGap)
 | 
			
		||||
 | 
			
		||||
			// Use configz to get the new config.
 | 
			
		||||
			resp = pollConfigz(2*time.Minute, 5*time.Second)
 | 
			
		||||
			kubeCfg, err = decodeConfigz(resp)
 | 
			
		||||
			framework.ExpectNoError(err)
 | 
			
		||||
			glog.Infof("KubeletConfiguration - After modification of FileCheckFrequency: %+v", *kubeCfg)
 | 
			
		||||
 | 
			
		||||
			// We expect to see the new value in the new config.
 | 
			
		||||
			Expect(kubeCfg.FileCheckFrequency.Duration).To(Equal(newFileCheckFrequency))
 | 
			
		||||
 | 
			
		||||
			// Change the config back to what it originally was.
 | 
			
		||||
			kubeCfg.FileCheckFrequency.Duration = oldFileCheckFrequency
 | 
			
		||||
			_, err = updateConfigMap(f, kubeCfg)
 | 
			
		||||
			framework.ExpectNoError(err)
 | 
			
		||||
 | 
			
		||||
			// Give the Kubelet time to see that there is new config and restart. If we don't do this,
 | 
			
		||||
			// the Kubelet will still have the old config when we poll, and the test will fail.
 | 
			
		||||
			time.Sleep(restartGap)
 | 
			
		||||
 | 
			
		||||
			// User configz to get the new config.
 | 
			
		||||
			resp = pollConfigz(2*time.Minute, 5*time.Second)
 | 
			
		||||
			kubeCfg, err = decodeConfigz(resp)
 | 
			
		||||
			framework.ExpectNoError(err)
 | 
			
		||||
			glog.Infof("KubeletConfiguration - After restoration of FileCheckFrequency: %+v", *kubeCfg)
 | 
			
		||||
 | 
			
		||||
			// We expect to see the original value restored in the new config.
 | 
			
		||||
			Expect(kubeCfg.FileCheckFrequency.Duration).To(Equal(oldFileCheckFrequency))
 | 
			
		||||
		})
 | 
			
		||||
	})
 | 
			
		||||
})
 | 
			
		||||
 | 
			
		||||
// This function either causes the test to fail, or it returns a status 200 response.
 | 
			
		||||
func pollConfigz(timeout time.Duration, pollInterval time.Duration) *http.Response {
 | 
			
		||||
	endpoint := fmt.Sprintf("http://127.0.0.1:8080/api/v1/proxy/nodes/%s/configz", framework.TestContext.NodeName)
 | 
			
		||||
	client := &http.Client{}
 | 
			
		||||
	req, err := http.NewRequest("GET", endpoint, nil)
 | 
			
		||||
	framework.ExpectNoError(err)
 | 
			
		||||
	req.Header.Add("Accept", "application/json")
 | 
			
		||||
 | 
			
		||||
	var resp *http.Response
 | 
			
		||||
	Eventually(func() bool {
 | 
			
		||||
		resp, err = client.Do(req)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			glog.Errorf("Failed to get /configz, retrying. Error: %v", err)
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
		if resp.StatusCode != 200 {
 | 
			
		||||
			glog.Errorf("/configz response status not 200, retrying. Response was: %+v", resp)
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
		return true
 | 
			
		||||
	}, timeout, pollInterval).Should(Equal(true))
 | 
			
		||||
	return resp
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Decodes the http response  from /configz and returns a componentconfig.KubeletConfiguration (internal type).
 | 
			
		||||
func decodeConfigz(resp *http.Response) (*componentconfig.KubeletConfiguration, error) {
 | 
			
		||||
	// This hack because /configz reports the following structure:
 | 
			
		||||
	// {"componentconfig": {the JSON representation of v1alpha1.KubeletConfiguration}}
 | 
			
		||||
	type configzWrapper struct {
 | 
			
		||||
		ComponentConfig v1alpha1.KubeletConfiguration `json:"componentconfig"`
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	configz := configzWrapper{}
 | 
			
		||||
	kubeCfg := componentconfig.KubeletConfiguration{}
 | 
			
		||||
 | 
			
		||||
	contentsBytes, err := ioutil.ReadAll(resp.Body)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = json.Unmarshal(contentsBytes, &configz)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = api.Scheme.Convert(&configz.ComponentConfig, &kubeCfg, nil)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &kubeCfg, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Uses KubeletConfiguration to create a `kubelet-<node-name>` ConfigMap in the "kube-system" namespace.
 | 
			
		||||
func createConfigMap(f *framework.Framework, kubeCfg *componentconfig.KubeletConfiguration) (*api.ConfigMap, error) {
 | 
			
		||||
	kubeCfgExt := v1alpha1.KubeletConfiguration{}
 | 
			
		||||
	api.Scheme.Convert(kubeCfg, &kubeCfgExt, nil)
 | 
			
		||||
 | 
			
		||||
	bytes, err := json.Marshal(kubeCfgExt)
 | 
			
		||||
	framework.ExpectNoError(err)
 | 
			
		||||
 | 
			
		||||
	cmap, err := f.Client.ConfigMaps("kube-system").Create(&api.ConfigMap{
 | 
			
		||||
		ObjectMeta: api.ObjectMeta{
 | 
			
		||||
			Name: fmt.Sprintf("kubelet-%s", framework.TestContext.NodeName),
 | 
			
		||||
		},
 | 
			
		||||
		Data: map[string]string{
 | 
			
		||||
			"kubelet.config": string(bytes),
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return cmap, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Similar to createConfigMap, except this updates an existing ConfigMap.
 | 
			
		||||
func updateConfigMap(f *framework.Framework, kubeCfg *componentconfig.KubeletConfiguration) (*api.ConfigMap, error) {
 | 
			
		||||
	kubeCfgExt := v1alpha1.KubeletConfiguration{}
 | 
			
		||||
	api.Scheme.Convert(kubeCfg, &kubeCfgExt, nil)
 | 
			
		||||
 | 
			
		||||
	bytes, err := json.Marshal(kubeCfgExt)
 | 
			
		||||
	framework.ExpectNoError(err)
 | 
			
		||||
 | 
			
		||||
	cmap, err := f.Client.ConfigMaps("kube-system").Update(&api.ConfigMap{
 | 
			
		||||
		ObjectMeta: api.ObjectMeta{
 | 
			
		||||
			Name: fmt.Sprintf("kubelet-%s", framework.TestContext.NodeName),
 | 
			
		||||
		},
 | 
			
		||||
		Data: map[string]string{
 | 
			
		||||
			"kubelet.config": string(bytes),
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return cmap, nil
 | 
			
		||||
}
 | 
			
		||||
@@ -87,7 +87,7 @@ func (e *E2EServices) Start() error {
 | 
			
		||||
		"--manifest-path", framework.TestContext.ManifestPath,
 | 
			
		||||
		"--eviction-hard", framework.TestContext.EvictionHard,
 | 
			
		||||
	)
 | 
			
		||||
	e.services = newServer("services", startCmd, nil, getHealthCheckURLs(), servicesLogFile)
 | 
			
		||||
	e.services = newServer("services", startCmd, nil, nil, getHealthCheckURLs(), servicesLogFile, false)
 | 
			
		||||
	return e.services.start()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -335,7 +335,7 @@ func (es *e2eService) startNamespaceController() error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (es *e2eService) startKubeletServer() (*server, error) {
 | 
			
		||||
	var killCommand *exec.Cmd
 | 
			
		||||
	var killCommand, restartCommand *exec.Cmd
 | 
			
		||||
	cmdArgs := []string{}
 | 
			
		||||
	if systemdRun, err := exec.LookPath("systemd-run"); err == nil {
 | 
			
		||||
		// On systemd services, detection of a service / unit works reliably while
 | 
			
		||||
@@ -343,8 +343,9 @@ func (es *e2eService) startKubeletServer() (*server, error) {
 | 
			
		||||
		// Since kubelet will typically be run as a service it also makes more
 | 
			
		||||
		// sense to test it that way
 | 
			
		||||
		unitName := fmt.Sprintf("kubelet-%d.service", rand.Int31())
 | 
			
		||||
		cmdArgs = append(cmdArgs, systemdRun, "--unit="+unitName, getKubeletServerBin())
 | 
			
		||||
		cmdArgs = append(cmdArgs, systemdRun, "--unit="+unitName, "--remain-after-exit", getKubeletServerBin())
 | 
			
		||||
		killCommand = exec.Command("sudo", "systemctl", "kill", unitName)
 | 
			
		||||
		restartCommand = exec.Command("sudo", "systemctl", "restart", unitName)
 | 
			
		||||
		es.logFiles["kubelet.log"] = logFileData{
 | 
			
		||||
			journalctlCommand: []string{"-u", unitName},
 | 
			
		||||
		}
 | 
			
		||||
@@ -372,6 +373,7 @@ func (es *e2eService) startKubeletServer() (*server, error) {
 | 
			
		||||
		"--pod-cidr=10.180.0.0/24", // Assign a fixed CIDR to the node because there is no node controller.
 | 
			
		||||
		"--eviction-hard", framework.TestContext.EvictionHard,
 | 
			
		||||
		"--eviction-pressure-transition-period", "30s",
 | 
			
		||||
		"--feature-gates", "DynamicKubeletConfig=true", // TODO(mtaufen): Eventually replace with a value from the framework.TestContext
 | 
			
		||||
	)
 | 
			
		||||
	if framework.TestContext.CgroupsPerQOS {
 | 
			
		||||
		// TODO: enable this when the flag is stable and available in kubelet.
 | 
			
		||||
@@ -394,8 +396,10 @@ func (es *e2eService) startKubeletServer() (*server, error) {
 | 
			
		||||
		"kubelet",
 | 
			
		||||
		cmd,
 | 
			
		||||
		killCommand,
 | 
			
		||||
		restartCommand,
 | 
			
		||||
		[]string{kubeletHealthCheckURL},
 | 
			
		||||
		"kubelet.log")
 | 
			
		||||
		"kubelet.log",
 | 
			
		||||
		true)
 | 
			
		||||
	return server, server.start()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -408,20 +412,32 @@ type server struct {
 | 
			
		||||
	// killCommand is the command used to stop the server. It is not required. If it
 | 
			
		||||
	// is not specified, `sudo kill` will be used to stop the server.
 | 
			
		||||
	killCommand *exec.Cmd
 | 
			
		||||
	// restartCommand is the command used to restart the server. If provided, it will be used
 | 
			
		||||
	// instead of startCommand when restarting the server.
 | 
			
		||||
	restartCommand *exec.Cmd
 | 
			
		||||
	// healthCheckUrls is the urls used to check whether the server is ready.
 | 
			
		||||
	healthCheckUrls []string
 | 
			
		||||
	// outFilename is the name of the log file. The stdout and stderr of the server
 | 
			
		||||
	// will be redirected to this file.
 | 
			
		||||
	outFilename string
 | 
			
		||||
	// restartOnExit determines whether a restart loop is launched with the server
 | 
			
		||||
	restartOnExit bool
 | 
			
		||||
	// Writing to this channel, if it is not nil, stops the restart loop.
 | 
			
		||||
	// When tearing down a server, you should check for this channel and write to it if it exists.
 | 
			
		||||
	stopRestartingCh chan<- bool
 | 
			
		||||
	// Read from this to confirm that the restart loop has stopped.
 | 
			
		||||
	ackStopRestartingCh <-chan bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newServer(name string, start, kill *exec.Cmd, urls []string, filename string) *server {
 | 
			
		||||
func newServer(name string, start, kill, restart *exec.Cmd, urls []string, filename string, restartOnExit bool) *server {
 | 
			
		||||
	return &server{
 | 
			
		||||
		name:            name,
 | 
			
		||||
		startCommand:    start,
 | 
			
		||||
		killCommand:     kill,
 | 
			
		||||
		restartCommand:  restart,
 | 
			
		||||
		healthCheckUrls: urls,
 | 
			
		||||
		outFilename:     filename,
 | 
			
		||||
		restartOnExit:   restartOnExit,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -434,8 +450,8 @@ func commandToString(c *exec.Cmd) string {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *server) String() string {
 | 
			
		||||
	return fmt.Sprintf("server %q start-command: `%s`, kill-command: `%s`, health-check: %v, output-file: %q", s.name,
 | 
			
		||||
		commandToString(s.startCommand), commandToString(s.killCommand), s.healthCheckUrls, s.outFilename)
 | 
			
		||||
	return fmt.Sprintf("server %q start-command: `%s`, kill-command: `%s`, restart-command: `%s`, health-check: %v, output-file: %q", s.name,
 | 
			
		||||
		commandToString(s.startCommand), commandToString(s.killCommand), commandToString(s.restartCommand), s.healthCheckUrls, s.outFilename)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// readinessCheck checks whether services are ready via the health check urls. Once there is
 | 
			
		||||
@@ -481,8 +497,23 @@ func readinessCheck(urls []string, errCh <-chan error) error {
 | 
			
		||||
	return fmt.Errorf("e2e service readiness check timeout %v", *serverStartTimeout)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Note: restartOnExit == true requires len(s.healthCheckUrls) > 0 to work properly.
 | 
			
		||||
func (s *server) start() error {
 | 
			
		||||
	errCh := make(chan error)
 | 
			
		||||
 | 
			
		||||
	var stopRestartingCh, ackStopRestartingCh chan bool
 | 
			
		||||
	if s.restartOnExit {
 | 
			
		||||
		if len(s.healthCheckUrls) == 0 {
 | 
			
		||||
			return fmt.Errorf("Tried to start %s which has s.restartOnExit == true, but no health check urls provided.", s)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		stopRestartingCh = make(chan bool)
 | 
			
		||||
		ackStopRestartingCh = make(chan bool)
 | 
			
		||||
 | 
			
		||||
		s.stopRestartingCh = stopRestartingCh
 | 
			
		||||
		s.ackStopRestartingCh = ackStopRestartingCh
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(errCh)
 | 
			
		||||
 | 
			
		||||
@@ -496,10 +527,9 @@ func (s *server) start() error {
 | 
			
		||||
		defer outfile.Close()
 | 
			
		||||
		defer outfile.Sync()
 | 
			
		||||
 | 
			
		||||
		cmd := s.startCommand
 | 
			
		||||
		// Set the command to write the output file
 | 
			
		||||
		cmd.Stdout = outfile
 | 
			
		||||
		cmd.Stderr = outfile
 | 
			
		||||
		s.startCommand.Stdout = outfile
 | 
			
		||||
		s.startCommand.Stderr = outfile
 | 
			
		||||
 | 
			
		||||
		// Death of this test process should kill the server as well.
 | 
			
		||||
		attrs := &syscall.SysProcAttr{}
 | 
			
		||||
@@ -511,14 +541,96 @@ func (s *server) start() error {
 | 
			
		||||
			errCh <- fmt.Errorf("failed to set Pdeathsig field (non-linux build)")
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		cmd.SysProcAttr = attrs
 | 
			
		||||
		s.startCommand.SysProcAttr = attrs
 | 
			
		||||
 | 
			
		||||
		// Run the command
 | 
			
		||||
		err = cmd.Run()
 | 
			
		||||
		// Start the command
 | 
			
		||||
		err = s.startCommand.Start()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errCh <- fmt.Errorf("failed to run server start command %q: %v", commandToString(cmd), err)
 | 
			
		||||
			errCh <- fmt.Errorf("failed to run %s: %v", s, err)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		if !s.restartOnExit {
 | 
			
		||||
			// If we aren't planning on restarting, ok to Wait() here to release resources.
 | 
			
		||||
			// Otherwise, we Wait() in the restart loop.
 | 
			
		||||
			err = s.startCommand.Wait()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				errCh <- fmt.Errorf("failed to run %s: %v", s, err)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			// New stuff
 | 
			
		||||
			usedStartCmd := true
 | 
			
		||||
			for {
 | 
			
		||||
				// Wait for an initial health check to pass, so that we are sure the server started.
 | 
			
		||||
				err := readinessCheck(s.healthCheckUrls, nil)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					if usedStartCmd {
 | 
			
		||||
						s.startCommand.Wait() // Release resources if necessary.
 | 
			
		||||
					}
 | 
			
		||||
					// This should not happen, immediately stop the e2eService process.
 | 
			
		||||
					glog.Fatalf("restart loop readinessCheck failed for %s", s)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				// Initial health check passed, wait until a health check fails again.
 | 
			
		||||
			stillAlive:
 | 
			
		||||
				for {
 | 
			
		||||
					select {
 | 
			
		||||
					case <-stopRestartingCh:
 | 
			
		||||
						ackStopRestartingCh <- true
 | 
			
		||||
						return
 | 
			
		||||
					case <-time.After(time.Second):
 | 
			
		||||
						for _, url := range s.healthCheckUrls {
 | 
			
		||||
							resp, err := http.Get(url)
 | 
			
		||||
							if err != nil || resp.StatusCode != http.StatusOK {
 | 
			
		||||
								break stillAlive
 | 
			
		||||
							}
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				if usedStartCmd {
 | 
			
		||||
					s.startCommand.Wait() // Release resources from last cmd
 | 
			
		||||
					usedStartCmd = false
 | 
			
		||||
				}
 | 
			
		||||
				if s.restartCommand != nil {
 | 
			
		||||
					// Always make a fresh copy of restartCommand before running, we may have to restart multiple times
 | 
			
		||||
					s.restartCommand = &exec.Cmd{
 | 
			
		||||
						Path:        s.restartCommand.Path,
 | 
			
		||||
						Args:        s.restartCommand.Args,
 | 
			
		||||
						Env:         s.restartCommand.Env,
 | 
			
		||||
						Dir:         s.restartCommand.Dir,
 | 
			
		||||
						Stdin:       s.restartCommand.Stdin,
 | 
			
		||||
						Stdout:      s.restartCommand.Stdout,
 | 
			
		||||
						Stderr:      s.restartCommand.Stderr,
 | 
			
		||||
						ExtraFiles:  s.restartCommand.ExtraFiles,
 | 
			
		||||
						SysProcAttr: s.restartCommand.SysProcAttr,
 | 
			
		||||
					}
 | 
			
		||||
					err = s.restartCommand.Run() // Run and wait for exit. This command is assumed to have short duration, e.g. systemctl restart
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						// This should not happen, immediately stop the e2eService process.
 | 
			
		||||
						glog.Fatalf("restarting %s with restartCommand failed. Error: %v.", s, err)
 | 
			
		||||
					}
 | 
			
		||||
				} else {
 | 
			
		||||
					s.startCommand = &exec.Cmd{
 | 
			
		||||
						Path:        s.startCommand.Path,
 | 
			
		||||
						Args:        s.startCommand.Args,
 | 
			
		||||
						Env:         s.startCommand.Env,
 | 
			
		||||
						Dir:         s.startCommand.Dir,
 | 
			
		||||
						Stdin:       s.startCommand.Stdin,
 | 
			
		||||
						Stdout:      s.startCommand.Stdout,
 | 
			
		||||
						Stderr:      s.startCommand.Stderr,
 | 
			
		||||
						ExtraFiles:  s.startCommand.ExtraFiles,
 | 
			
		||||
						SysProcAttr: s.startCommand.SysProcAttr,
 | 
			
		||||
					}
 | 
			
		||||
					err = s.startCommand.Start()
 | 
			
		||||
					usedStartCmd = true
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						// This should not happen, immediately stop the e2eService process.
 | 
			
		||||
						glog.Fatalf("restarting %s with startCommand failed. Error: %v.", s, err)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return readinessCheck(s.healthCheckUrls, errCh)
 | 
			
		||||
@@ -528,6 +640,12 @@ func (s *server) kill() error {
 | 
			
		||||
	name := s.name
 | 
			
		||||
	cmd := s.startCommand
 | 
			
		||||
 | 
			
		||||
	// If s has a restart loop, turn it off.
 | 
			
		||||
	if s.restartOnExit {
 | 
			
		||||
		s.stopRestartingCh <- true
 | 
			
		||||
		<-s.ackStopRestartingCh
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if s.killCommand != nil {
 | 
			
		||||
		return s.killCommand.Run()
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user