mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 10:18:13 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			994 lines
		
	
	
		
			35 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			994 lines
		
	
	
		
			35 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2015 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| // Package app makes it easy to create a kubelet server for various contexts.
 | |
| package app
 | |
| 
 | |
| import (
 | |
| 	"crypto/tls"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io/ioutil"
 | |
| 	"math/rand"
 | |
| 	"net"
 | |
| 	"net/http"
 | |
| 	_ "net/http/pprof"
 | |
| 	"net/url"
 | |
| 	"os"
 | |
| 	"path"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/golang/glog"
 | |
| 	"github.com/spf13/cobra"
 | |
| 	"github.com/spf13/pflag"
 | |
| 
 | |
| 	"k8s.io/apimachinery/pkg/api/resource"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/types"
 | |
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/util/sets"
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| 	"k8s.io/apiserver/pkg/server/healthz"
 | |
| 	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | |
| 	clientgoclientset "k8s.io/client-go/kubernetes"
 | |
| 	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
 | |
| 	clientv1 "k8s.io/client-go/pkg/api/v1"
 | |
| 	restclient "k8s.io/client-go/rest"
 | |
| 	clientauth "k8s.io/client-go/tools/auth"
 | |
| 	"k8s.io/client-go/tools/clientcmd"
 | |
| 	clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
 | |
| 	"k8s.io/client-go/tools/record"
 | |
| 	certutil "k8s.io/client-go/util/cert"
 | |
| 	"k8s.io/kubernetes/cmd/kubelet/app/options"
 | |
| 	"k8s.io/kubernetes/pkg/api"
 | |
| 	"k8s.io/kubernetes/pkg/api/v1"
 | |
| 	"k8s.io/kubernetes/pkg/apis/componentconfig"
 | |
| 	componentconfigv1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
 | |
| 	"k8s.io/kubernetes/pkg/capabilities"
 | |
| 	"k8s.io/kubernetes/pkg/client/chaosclient"
 | |
| 	"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
 | |
| 	"k8s.io/kubernetes/pkg/cloudprovider"
 | |
| 	"k8s.io/kubernetes/pkg/credentialprovider"
 | |
| 	"k8s.io/kubernetes/pkg/features"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/cadvisor"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/cm"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/config"
 | |
| 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/dockershim"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
 | |
| 	dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/eviction"
 | |
| 	evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/server"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/server/streaming"
 | |
| 	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
 | |
| 	"k8s.io/kubernetes/pkg/util/configz"
 | |
| 	"k8s.io/kubernetes/pkg/util/flock"
 | |
| 	kubeio "k8s.io/kubernetes/pkg/util/io"
 | |
| 	"k8s.io/kubernetes/pkg/util/mount"
 | |
| 	nodeutil "k8s.io/kubernetes/pkg/util/node"
 | |
| 	"k8s.io/kubernetes/pkg/util/oom"
 | |
| 	"k8s.io/kubernetes/pkg/util/rlimit"
 | |
| 	"k8s.io/kubernetes/pkg/version"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// Kubelet component name
 | |
| 	componentKubelet = "kubelet"
 | |
| )
 | |
| 
 | |
| // NewKubeletCommand creates a *cobra.Command object with default parameters
 | |
| func NewKubeletCommand() *cobra.Command {
 | |
| 	s := options.NewKubeletServer()
 | |
| 	s.AddFlags(pflag.CommandLine)
 | |
| 	cmd := &cobra.Command{
 | |
| 		Use: componentKubelet,
 | |
| 		Long: `The kubelet is the primary "node agent" that runs on each
 | |
| node. The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object
 | |
| that describes a pod. The kubelet takes a set of PodSpecs that are provided through
 | |
| various mechanisms (primarily through the apiserver) and ensures that the containers
 | |
| described in those PodSpecs are running and healthy. The kubelet doesn't manage
 | |
| containers which were not created by Kubernetes.
 | |
| 
 | |
| Other than from an PodSpec from the apiserver, there are three ways that a container
 | |
| manifest can be provided to the Kubelet.
 | |
| 
 | |
| File: Path passed as a flag on the command line. Files under this path will be monitored
 | |
| periodically for updates. The monitoring period is 20s by default and is configurable
 | |
| via a flag.
 | |
| 
 | |
| HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint
 | |
| is checked every 20 seconds (also configurable with a flag).
 | |
| 
 | |
| HTTP server: The kubelet can also listen for HTTP and respond to a simple API
 | |
| (underspec'd currently) to submit a new manifest.`,
 | |
| 		Run: func(cmd *cobra.Command, args []string) {
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	return cmd
 | |
| }
 | |
| 
 | |
| // UnsecuredKubeletDeps returns a KubeletDeps suitable for being run, or an error if the server setup
 | |
| // is not valid.  It will not start any background processes, and does not include authentication/authorization
 | |
| func UnsecuredKubeletDeps(s *options.KubeletServer) (*kubelet.KubeletDeps, error) {
 | |
| 	// Initialize the TLS Options
 | |
| 	tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	mounter := mount.New(s.ExperimentalMounterPath)
 | |
| 	var writer kubeio.Writer = &kubeio.StdWriter{}
 | |
| 	if s.Containerized {
 | |
| 		glog.V(2).Info("Running kubelet in containerized mode (experimental)")
 | |
| 		mounter = mount.NewNsenterMounter()
 | |
| 		writer = &kubeio.NsenterWriter{}
 | |
| 	}
 | |
| 
 | |
| 	var dockerClient libdocker.Interface
 | |
| 	if s.ContainerRuntime == "docker" {
 | |
| 		dockerClient = libdocker.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration,
 | |
| 			s.ImagePullProgressDeadline.Duration)
 | |
| 	} else {
 | |
| 		dockerClient = nil
 | |
| 	}
 | |
| 
 | |
| 	return &kubelet.KubeletDeps{
 | |
| 		Auth:               nil, // default does not enforce auth[nz]
 | |
| 		CAdvisorInterface:  nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
 | |
| 		Cloud:              nil, // cloud provider might start background processes
 | |
| 		ContainerManager:   nil,
 | |
| 		DockerClient:       dockerClient,
 | |
| 		KubeClient:         nil,
 | |
| 		ExternalKubeClient: nil,
 | |
| 		Mounter:            mounter,
 | |
| 		NetworkPlugins:     ProbeNetworkPlugins(s.NetworkPluginDir, s.CNIConfDir, s.CNIBinDir),
 | |
| 		OOMAdjuster:        oom.NewOOMAdjuster(),
 | |
| 		OSInterface:        kubecontainer.RealOS{},
 | |
| 		Writer:             writer,
 | |
| 		VolumePlugins:      ProbeVolumePlugins(s.VolumePluginDir),
 | |
| 		TLSOptions:         tlsOptions,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func getKubeClient(s *options.KubeletServer) (*clientset.Clientset, error) {
 | |
| 	clientConfig, err := CreateAPIServerClientConfig(s)
 | |
| 	if err == nil {
 | |
| 		kubeClient, err := clientset.NewForConfig(clientConfig)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		return kubeClient, nil
 | |
| 	}
 | |
| 	return nil, err
 | |
| }
 | |
| 
 | |
| // Tries to download the kubelet-<node-name> configmap from "kube-system" namespace via the API server and returns a JSON string or error
 | |
| func getRemoteKubeletConfig(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (string, error) {
 | |
| 	// TODO(mtaufen): should probably cache clientset and pass into this function rather than regenerate on every request
 | |
| 	kubeClient, err := getKubeClient(s)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 
 | |
| 	configmap, err := func() (*v1.ConfigMap, error) {
 | |
| 		var nodename types.NodeName
 | |
| 		hostname := nodeutil.GetHostname(s.HostnameOverride)
 | |
| 
 | |
| 		if kubeDeps != nil && kubeDeps.Cloud != nil {
 | |
| 			instances, ok := kubeDeps.Cloud.Instances()
 | |
| 			if !ok {
 | |
| 				err = fmt.Errorf("failed to get instances from cloud provider, can't determine nodename")
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			nodename, err = instances.CurrentNodeName(hostname)
 | |
| 			if err != nil {
 | |
| 				err = fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			// look for kubelet-<node-name> configmap from "kube-system"
 | |
| 			configmap, err := kubeClient.CoreV1Client.ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", nodename), metav1.GetOptions{})
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			return configmap, nil
 | |
| 		}
 | |
| 		// No cloud provider yet, so can't get the nodename via Cloud.Instances().CurrentNodeName(hostname), try just using the hostname
 | |
| 		configmap, err := kubeClient.CoreV1Client.ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", hostname), metav1.GetOptions{})
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("cloud provider was nil, and attempt to use hostname to find config resulted in: %v", err)
 | |
| 		}
 | |
| 		return configmap, nil
 | |
| 	}()
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 
 | |
| 	// When we create the KubeletConfiguration configmap, we put a json string
 | |
| 	// representation of the config in a `kubelet.config` key.
 | |
| 	jsonstr, ok := configmap.Data["kubelet.config"]
 | |
| 	if !ok {
 | |
| 		return "", fmt.Errorf("KubeletConfiguration configmap did not contain a value with key `kubelet.config`")
 | |
| 	}
 | |
| 
 | |
| 	return jsonstr, nil
 | |
| }
 | |
| 
 | |
| func startKubeletConfigSyncLoop(s *options.KubeletServer, currentKC string) {
 | |
| 	glog.Infof("Starting Kubelet configuration sync loop")
 | |
| 	go func() {
 | |
| 		wait.PollInfinite(30*time.Second, func() (bool, error) {
 | |
| 			glog.Infof("Checking API server for new Kubelet configuration.")
 | |
| 			remoteKC, err := getRemoteKubeletConfig(s, nil)
 | |
| 			if err == nil {
 | |
| 				// Detect new config by comparing with the last JSON string we extracted.
 | |
| 				if remoteKC != currentKC {
 | |
| 					glog.Info("Found new Kubelet configuration via API server, restarting!")
 | |
| 					os.Exit(0)
 | |
| 				}
 | |
| 			} else {
 | |
| 				glog.Infof("Did not find a configuration for this Kubelet via API server: %v", err)
 | |
| 			}
 | |
| 			return false, nil // Always return (false, nil) so we poll forever.
 | |
| 		})
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| // Try to check for config on the API server, return that config if we get it, and start
 | |
| // a background thread that checks for updates to configs.
 | |
| func initKubeletConfigSync(s *options.KubeletServer) (*componentconfig.KubeletConfiguration, error) {
 | |
| 	jsonstr, err := getRemoteKubeletConfig(s, nil)
 | |
| 	if err == nil {
 | |
| 		// We will compare future API server config against the config we just got (jsonstr):
 | |
| 		startKubeletConfigSyncLoop(s, jsonstr)
 | |
| 
 | |
| 		// Convert json from API server to external type struct, and convert that to internal type struct
 | |
| 		extKC := componentconfigv1alpha1.KubeletConfiguration{}
 | |
| 		err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(jsonstr), &extKC)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		api.Scheme.Default(&extKC)
 | |
| 		kc := componentconfig.KubeletConfiguration{}
 | |
| 		err = api.Scheme.Convert(&extKC, &kc, nil)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		return &kc, nil
 | |
| 	} else {
 | |
| 		// Couldn't get a configuration from the API server yet.
 | |
| 		// Restart as soon as anything comes back from the API server.
 | |
| 		startKubeletConfigSyncLoop(s, "")
 | |
| 		return nil, err
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Run runs the specified KubeletServer with the given KubeletDeps.  This should never exit.
 | |
| // The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
 | |
| // Otherwise, the caller is assumed to have set up the KubeletDeps object and a default one will
 | |
| // not be generated.
 | |
| func Run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) error {
 | |
| 	if err := run(s, kubeDeps); err != nil {
 | |
| 		return fmt.Errorf("failed to run Kubelet: %v", err)
 | |
| 
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func checkPermissions() error {
 | |
| 	if uid := os.Getuid(); uid != 0 {
 | |
| 		return fmt.Errorf("Kubelet needs to run as uid `0`. It is being run as %d", uid)
 | |
| 	}
 | |
| 	// TODO: Check if kubelet is running in the `initial` user namespace.
 | |
| 	// http://man7.org/linux/man-pages/man7/user_namespaces.7.html
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func setConfigz(cz *configz.Config, kc *componentconfig.KubeletConfiguration) {
 | |
| 	tmp := componentconfigv1alpha1.KubeletConfiguration{}
 | |
| 	api.Scheme.Convert(kc, &tmp, nil)
 | |
| 	cz.Set(tmp)
 | |
| }
 | |
| 
 | |
| func initConfigz(kc *componentconfig.KubeletConfiguration) (*configz.Config, error) {
 | |
| 	cz, err := configz.New("componentconfig")
 | |
| 	if err == nil {
 | |
| 		setConfigz(cz, kc)
 | |
| 	} else {
 | |
| 		glog.Errorf("unable to register configz: %s", err)
 | |
| 	}
 | |
| 	return cz, err
 | |
| }
 | |
| 
 | |
| // validateConfig validates configuration of Kubelet and returns an error is the input configuration is invalid.
 | |
| func validateConfig(s *options.KubeletServer) error {
 | |
| 	if !s.CgroupsPerQOS && len(s.EnforceNodeAllocatable) > 0 {
 | |
| 		return fmt.Errorf("Node Allocatable enforcement is not supported unless Cgroups Per QOS feature is turned on")
 | |
| 	}
 | |
| 	if s.SystemCgroups != "" && s.CgroupRoot == "" {
 | |
| 		return fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified")
 | |
| 	}
 | |
| 	for _, val := range s.EnforceNodeAllocatable {
 | |
| 		switch val {
 | |
| 		case cm.NodeAllocatableEnforcementKey:
 | |
| 		case cm.SystemReservedEnforcementKey:
 | |
| 		case cm.KubeReservedEnforcementKey:
 | |
| 			continue
 | |
| 		default:
 | |
| 			return fmt.Errorf("invalid option %q specified for EnforceNodeAllocatable setting. Valid options are %q, %q or %q", val, cm.NodeAllocatableEnforcementKey, cm.SystemReservedEnforcementKey, cm.KubeReservedEnforcementKey)
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // makeEventRecorder sets up kubeDeps.Recorder if its nil. Its a no-op otherwise.
 | |
| func makeEventRecorder(s *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, nodeName types.NodeName) {
 | |
| 	if kubeDeps.Recorder != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	eventBroadcaster := record.NewBroadcaster()
 | |
| 	kubeDeps.Recorder = eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: componentKubelet, Host: string(nodeName)})
 | |
| 	eventBroadcaster.StartLogging(glog.V(3).Infof)
 | |
| 	if kubeDeps.EventClient != nil {
 | |
| 		glog.V(4).Infof("Sending events to api server.")
 | |
| 		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
 | |
| 	} else {
 | |
| 		glog.Warning("No api server defined - no events will be sent to API server.")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) {
 | |
| 	// TODO: this should be replaced by a --standalone flag
 | |
| 	standaloneMode := (len(s.APIServerList) == 0 && !s.RequireKubeConfig)
 | |
| 
 | |
| 	if s.ExitOnLockContention && s.LockFilePath == "" {
 | |
| 		return errors.New("cannot exit on lock file contention: no lock file specified")
 | |
| 	}
 | |
| 
 | |
| 	done := make(chan struct{})
 | |
| 	if s.LockFilePath != "" {
 | |
| 		glog.Infof("acquiring file lock on %q", s.LockFilePath)
 | |
| 		if err := flock.Acquire(s.LockFilePath); err != nil {
 | |
| 			return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
 | |
| 		}
 | |
| 		if s.ExitOnLockContention {
 | |
| 			glog.Infof("watching for inotify events for: %v", s.LockFilePath)
 | |
| 			if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Set feature gates based on the value in KubeletConfiguration
 | |
| 	err = utilfeature.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Register current configuration with /configz endpoint
 | |
| 	cfgz, cfgzErr := initConfigz(&s.KubeletConfiguration)
 | |
| 	if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
 | |
| 		// Look for config on the API server. If it exists, replace s.KubeletConfiguration
 | |
| 		// with it and continue. initKubeletConfigSync also starts the background thread that checks for new config.
 | |
| 
 | |
| 		// Don't do dynamic Kubelet configuration in runonce mode
 | |
| 		if s.RunOnce == false {
 | |
| 			remoteKC, err := initKubeletConfigSync(s)
 | |
| 			if err == nil {
 | |
| 				// Update s (KubeletServer) with new config from API server
 | |
| 				s.KubeletConfiguration = *remoteKC
 | |
| 				// Ensure that /configz is up to date with the new config
 | |
| 				if cfgzErr != nil {
 | |
| 					glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr)
 | |
| 				} else {
 | |
| 					setConfigz(cfgz, &s.KubeletConfiguration)
 | |
| 				}
 | |
| 				// Update feature gates from the new config
 | |
| 				err = utilfeature.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates)
 | |
| 				if err != nil {
 | |
| 					return err
 | |
| 				}
 | |
| 			} else {
 | |
| 				glog.Errorf("failed to init dynamic Kubelet configuration sync: %v", err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Validate configuration.
 | |
| 	if err := validateConfig(s); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if kubeDeps == nil {
 | |
| 		var kubeClient clientset.Interface
 | |
| 		var eventClient v1core.EventsGetter
 | |
| 		var externalKubeClient clientgoclientset.Interface
 | |
| 		var cloud cloudprovider.Interface
 | |
| 
 | |
| 		if !cloudprovider.IsExternal(s.CloudProvider) && s.CloudProvider != componentconfigv1alpha1.AutoDetectCloudProvider {
 | |
| 			cloud, err = cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			if cloud == nil {
 | |
| 				glog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
 | |
| 			} else {
 | |
| 				glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		nodeName, err := getNodeName(cloud, nodeutil.GetHostname(s.HostnameOverride))
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		if s.BootstrapKubeconfig != "" {
 | |
| 			if err := bootstrapClientCert(s.KubeConfig.Value(), s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		clientConfig, err := CreateAPIServerClientConfig(s)
 | |
| 		if err == nil {
 | |
| 			kubeClient, err = clientset.NewForConfig(clientConfig)
 | |
| 			if err != nil {
 | |
| 				glog.Warningf("New kubeClient from clientConfig error: %v", err)
 | |
| 			}
 | |
| 			externalKubeClient, err = clientgoclientset.NewForConfig(clientConfig)
 | |
| 			if err != nil {
 | |
| 				glog.Warningf("New kubeClient from clientConfig error: %v", err)
 | |
| 			}
 | |
| 			// make a separate client for events
 | |
| 			eventClientConfig := *clientConfig
 | |
| 			eventClientConfig.QPS = float32(s.EventRecordQPS)
 | |
| 			eventClientConfig.Burst = int(s.EventBurst)
 | |
| 			eventClient, err = clientgoclientset.NewForConfig(&eventClientConfig)
 | |
| 			if err != nil {
 | |
| 				glog.Warningf("Failed to create API Server client: %v", err)
 | |
| 			}
 | |
| 		} else {
 | |
| 			if s.RequireKubeConfig {
 | |
| 				return fmt.Errorf("invalid kubeconfig: %v", err)
 | |
| 			}
 | |
| 			if standaloneMode {
 | |
| 				glog.Warningf("No API client: %v", err)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		kubeDeps, err = UnsecuredKubeletDeps(s)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		kubeDeps.Cloud = cloud
 | |
| 		kubeDeps.KubeClient = kubeClient
 | |
| 		kubeDeps.ExternalKubeClient = externalKubeClient
 | |
| 		kubeDeps.EventClient = eventClient
 | |
| 	}
 | |
| 
 | |
| 	nodeName, err := getNodeName(kubeDeps.Cloud, nodeutil.GetHostname(s.HostnameOverride))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if kubeDeps.Auth == nil {
 | |
| 		auth, err := BuildAuth(nodeName, kubeDeps.ExternalKubeClient, s.KubeletConfiguration)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		kubeDeps.Auth = auth
 | |
| 	}
 | |
| 
 | |
| 	if kubeDeps.CAdvisorInterface == nil {
 | |
| 		kubeDeps.CAdvisorInterface, err = cadvisor.New(uint(s.CAdvisorPort), s.ContainerRuntime, s.RootDirectory)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Setup event recorder if required.
 | |
| 	makeEventRecorder(&s.KubeletConfiguration, kubeDeps, nodeName)
 | |
| 
 | |
| 	if kubeDeps.ContainerManager == nil {
 | |
| 		if s.CgroupsPerQOS && s.CgroupRoot == "" {
 | |
| 			glog.Infof("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
 | |
| 			s.CgroupRoot = "/"
 | |
| 		}
 | |
| 		kubeReserved, err := parseResourceList(s.KubeReserved)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		systemReserved, err := parseResourceList(s.SystemReserved)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		var hardEvictionThresholds []evictionapi.Threshold
 | |
| 		// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
 | |
| 		if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
 | |
| 			hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, "", "", "")
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 		experimentalQOSReserved, err := cm.ParseQOSReserved(s.ExperimentalQOSReserved)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		kubeDeps.ContainerManager, err = cm.NewContainerManager(
 | |
| 			kubeDeps.Mounter,
 | |
| 			kubeDeps.CAdvisorInterface,
 | |
| 			cm.NodeConfig{
 | |
| 				RuntimeCgroupsName:    s.RuntimeCgroups,
 | |
| 				SystemCgroupsName:     s.SystemCgroups,
 | |
| 				KubeletCgroupsName:    s.KubeletCgroups,
 | |
| 				ContainerRuntime:      s.ContainerRuntime,
 | |
| 				CgroupsPerQOS:         s.CgroupsPerQOS,
 | |
| 				CgroupRoot:            s.CgroupRoot,
 | |
| 				CgroupDriver:          s.CgroupDriver,
 | |
| 				ProtectKernelDefaults: s.ProtectKernelDefaults,
 | |
| 				NodeAllocatableConfig: cm.NodeAllocatableConfig{
 | |
| 					KubeReservedCgroupName:   s.KubeReservedCgroup,
 | |
| 					SystemReservedCgroupName: s.SystemReservedCgroup,
 | |
| 					EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
 | |
| 					KubeReserved:             kubeReserved,
 | |
| 					SystemReserved:           systemReserved,
 | |
| 					HardEvictionThresholds:   hardEvictionThresholds,
 | |
| 				},
 | |
| 				ExperimentalQOSReserved: *experimentalQOSReserved,
 | |
| 			},
 | |
| 			s.ExperimentalFailSwapOn,
 | |
| 			kubeDeps.Recorder)
 | |
| 
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if err := checkPermissions(); err != nil {
 | |
| 		glog.Error(err)
 | |
| 	}
 | |
| 
 | |
| 	utilruntime.ReallyCrash = s.ReallyCrashForTesting
 | |
| 
 | |
| 	rand.Seed(time.Now().UTC().UnixNano())
 | |
| 
 | |
| 	// TODO(vmarmol): Do this through container config.
 | |
| 	oomAdjuster := kubeDeps.OOMAdjuster
 | |
| 	if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
 | |
| 		glog.Warning(err)
 | |
| 	}
 | |
| 
 | |
| 	if err := RunKubelet(&s.KubeletFlags, &s.KubeletConfiguration, kubeDeps, s.RunOnce, standaloneMode); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if s.HealthzPort > 0 {
 | |
| 		healthz.DefaultHealthz()
 | |
| 		go wait.Until(func() {
 | |
| 			err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
 | |
| 			if err != nil {
 | |
| 				glog.Errorf("Starting health server failed: %v", err)
 | |
| 			}
 | |
| 		}, 5*time.Second, wait.NeverStop)
 | |
| 	}
 | |
| 
 | |
| 	if s.RunOnce {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	<-done
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // getNodeName returns the node name according to the cloud provider
 | |
| // if cloud provider is specified. Otherwise, returns the hostname of the node.
 | |
| func getNodeName(cloud cloudprovider.Interface, hostname string) (types.NodeName, error) {
 | |
| 	if cloud == nil {
 | |
| 		return types.NodeName(hostname), nil
 | |
| 	}
 | |
| 
 | |
| 	instances, ok := cloud.Instances()
 | |
| 	if !ok {
 | |
| 		return "", fmt.Errorf("failed to get instances from cloud provider")
 | |
| 	}
 | |
| 
 | |
| 	nodeName, err := instances.CurrentNodeName(hostname)
 | |
| 	if err != nil {
 | |
| 		return "", fmt.Errorf("error fetching current node name from cloud provider: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
 | |
| 
 | |
| 	return nodeName, nil
 | |
| }
 | |
| 
 | |
| // InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
 | |
| // certificate and key file are generated. Returns a configured server.TLSOptions object.
 | |
| func InitializeTLS(kf *options.KubeletFlags, kc *componentconfig.KubeletConfiguration) (*server.TLSOptions, error) {
 | |
| 	if kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
 | |
| 		kc.TLSCertFile = path.Join(kc.CertDirectory, "kubelet.crt")
 | |
| 		kc.TLSPrivateKeyFile = path.Join(kc.CertDirectory, "kubelet.key")
 | |
| 
 | |
| 		canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if !canReadCertAndKey {
 | |
| 			cert, key, err := certutil.GenerateSelfSignedCertKey(nodeutil.GetHostname(kf.HostnameOverride), nil, nil)
 | |
| 			if err != nil {
 | |
| 				return nil, fmt.Errorf("unable to generate self signed cert: %v", err)
 | |
| 			}
 | |
| 
 | |
| 			if err := certutil.WriteCert(kc.TLSCertFile, cert); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			if err := certutil.WriteKey(kc.TLSPrivateKeyFile, key); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			glog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile)
 | |
| 		}
 | |
| 	}
 | |
| 	tlsOptions := &server.TLSOptions{
 | |
| 		Config: &tls.Config{
 | |
| 			// Can't use SSLv3 because of POODLE and BEAST
 | |
| 			// Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher
 | |
| 			// Can't use TLSv1.1 because of RC4 cipher usage
 | |
| 			MinVersion: tls.VersionTLS12,
 | |
| 		},
 | |
| 		CertFile: kc.TLSCertFile,
 | |
| 		KeyFile:  kc.TLSPrivateKeyFile,
 | |
| 	}
 | |
| 
 | |
| 	if len(kc.Authentication.X509.ClientCAFile) > 0 {
 | |
| 		clientCAs, err := certutil.NewPool(kc.Authentication.X509.ClientCAFile)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("unable to load client CA file %s: %v", kc.Authentication.X509.ClientCAFile, err)
 | |
| 		}
 | |
| 		// Specify allowed CAs for client certificates
 | |
| 		tlsOptions.Config.ClientCAs = clientCAs
 | |
| 		// Populate PeerCertificates in requests, but don't reject connections without verified certificates
 | |
| 		tlsOptions.Config.ClientAuth = tls.RequestClientCert
 | |
| 	}
 | |
| 
 | |
| 	return tlsOptions, nil
 | |
| }
 | |
| 
 | |
| func kubeconfigClientConfig(s *options.KubeletServer) (*restclient.Config, error) {
 | |
| 	if s.RequireKubeConfig {
 | |
| 		// Ignores the values of s.APIServerList
 | |
| 		return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
 | |
| 			&clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig.Value()},
 | |
| 			&clientcmd.ConfigOverrides{},
 | |
| 		).ClientConfig()
 | |
| 	}
 | |
| 	return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
 | |
| 		&clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig.Value()},
 | |
| 		&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: s.APIServerList[0]}},
 | |
| 	).ClientConfig()
 | |
| }
 | |
| 
 | |
| // createClientConfig creates a client configuration from the command line
 | |
| // arguments. If --kubeconfig is explicitly set, it will be used. If it is
 | |
| // not set, we attempt to load the default kubeconfig file, and if we cannot,
 | |
| // we fall back to the default client with no auth - this fallback does not, in
 | |
| // and of itself, constitute an error.
 | |
| func createClientConfig(s *options.KubeletServer) (*restclient.Config, error) {
 | |
| 	if s.RequireKubeConfig {
 | |
| 		return kubeconfigClientConfig(s)
 | |
| 	}
 | |
| 
 | |
| 	// TODO: handle a new --standalone flag that bypasses kubeconfig loading and returns no error.
 | |
| 	// DEPRECATED: all subsequent code is deprecated
 | |
| 	if len(s.APIServerList) == 0 {
 | |
| 		return nil, fmt.Errorf("no api servers specified")
 | |
| 	}
 | |
| 	// TODO: adapt Kube client to support LB over several servers
 | |
| 	if len(s.APIServerList) > 1 {
 | |
| 		glog.Infof("Multiple api servers specified.  Picking first one")
 | |
| 	}
 | |
| 
 | |
| 	if s.KubeConfig.Provided() {
 | |
| 		return kubeconfigClientConfig(s)
 | |
| 	}
 | |
| 	// If KubeConfig was not provided, try to load the default file, then fall back
 | |
| 	// to a default auth config.
 | |
| 	clientConfig, err := kubeconfigClientConfig(s)
 | |
| 	if err != nil {
 | |
| 		glog.Warningf("Could not load kubeconfig file %s: %v. Using default client config instead.", s.KubeConfig, err)
 | |
| 
 | |
| 		authInfo := &clientauth.Info{}
 | |
| 		authConfig, err := authInfo.MergeWithConfig(restclient.Config{})
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		authConfig.Host = s.APIServerList[0]
 | |
| 		clientConfig = &authConfig
 | |
| 	}
 | |
| 	return clientConfig, nil
 | |
| }
 | |
| 
 | |
| // CreateAPIServerClientConfig generates a client.Config from command line flags,
 | |
| // including api-server-list, via createClientConfig and then injects chaos into
 | |
| // the configuration via addChaosToClientConfig. This func is exported to support
 | |
| // integration with third party kubelet extensions (e.g. kubernetes-mesos).
 | |
| func CreateAPIServerClientConfig(s *options.KubeletServer) (*restclient.Config, error) {
 | |
| 	clientConfig, err := createClientConfig(s)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	clientConfig.ContentType = s.ContentType
 | |
| 	// Override kubeconfig qps/burst settings from flags
 | |
| 	clientConfig.QPS = float32(s.KubeAPIQPS)
 | |
| 	clientConfig.Burst = int(s.KubeAPIBurst)
 | |
| 
 | |
| 	addChaosToClientConfig(s, clientConfig)
 | |
| 	return clientConfig, nil
 | |
| }
 | |
| 
 | |
| // addChaosToClientConfig injects random errors into client connections if configured.
 | |
| func addChaosToClientConfig(s *options.KubeletServer, config *restclient.Config) {
 | |
| 	if s.ChaosChance != 0.0 {
 | |
| 		config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
 | |
| 			seed := chaosclient.NewSeed(1)
 | |
| 			// TODO: introduce a standard chaos package with more tunables - this is just a proof of concept
 | |
| 			// TODO: introduce random latency and stalls
 | |
| 			return chaosclient.NewChaosRoundTripper(rt, chaosclient.LogChaos, seed.P(s.ChaosChance, chaosclient.ErrSimulatedConnectionResetByPeer))
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // RunKubelet is responsible for setting up and running a kubelet.  It is used in three different applications:
 | |
| //   1 Integration tests
 | |
| //   2 Kubelet binary
 | |
| //   3 Standalone 'kubernetes' binary
 | |
| // Eventually, #2 will be replaced with instances of #3
 | |
| func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error {
 | |
| 	hostname := nodeutil.GetHostname(kubeFlags.HostnameOverride)
 | |
| 	// Query the cloud provider for our node name, default to hostname if kcfg.Cloud == nil
 | |
| 	nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	// Setup event recorder if required.
 | |
| 	makeEventRecorder(kubeCfg, kubeDeps, nodeName)
 | |
| 
 | |
| 	// TODO(mtaufen): I moved the validation of these fields here, from UnsecuredKubeletConfig,
 | |
| 	//                so that I could remove the associated fields from KubeletConfig. I would
 | |
| 	//                prefer this to be done as part of an independent validation step on the
 | |
| 	//                KubeletConfiguration. But as far as I can tell, we don't have an explicit
 | |
| 	//                place for validation of the KubeletConfiguration yet.
 | |
| 	hostNetworkSources, err := kubetypes.GetValidatedSources(kubeCfg.HostNetworkSources)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	hostPIDSources, err := kubetypes.GetValidatedSources(kubeCfg.HostPIDSources)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	hostIPCSources, err := kubetypes.GetValidatedSources(kubeCfg.HostIPCSources)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	privilegedSources := capabilities.PrivilegedSources{
 | |
| 		HostNetworkSources: hostNetworkSources,
 | |
| 		HostPIDSources:     hostPIDSources,
 | |
| 		HostIPCSources:     hostIPCSources,
 | |
| 	}
 | |
| 	capabilities.Setup(kubeCfg.AllowPrivileged, privilegedSources, 0)
 | |
| 
 | |
| 	credentialprovider.SetPreferredDockercfgPath(kubeCfg.RootDirectory)
 | |
| 	glog.V(2).Infof("Using root directory: %v", kubeCfg.RootDirectory)
 | |
| 
 | |
| 	builder := kubeDeps.Builder
 | |
| 	if builder == nil {
 | |
| 		builder = CreateAndInitKubelet
 | |
| 	}
 | |
| 	if kubeDeps.OSInterface == nil {
 | |
| 		kubeDeps.OSInterface = kubecontainer.RealOS{}
 | |
| 	}
 | |
| 	k, err := builder(kubeCfg, kubeDeps, standaloneMode, kubeFlags.HostnameOverride, kubeFlags.NodeIP, kubeFlags.DockershimRootDirectory, kubeFlags.ProviderID)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to create kubelet: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	// NewMainKubelet should have set up a pod source config if one didn't exist
 | |
| 	// when the builder was run. This is just a precaution.
 | |
| 	if kubeDeps.PodConfig == nil {
 | |
| 		return fmt.Errorf("failed to create kubelet, pod source config was nil")
 | |
| 	}
 | |
| 	podCfg := kubeDeps.PodConfig
 | |
| 
 | |
| 	rlimit.RlimitNumFiles(uint64(kubeCfg.MaxOpenFiles))
 | |
| 
 | |
| 	// TODO(dawnchen): remove this once we deprecated old debian containervm images.
 | |
| 	// This is a workaround for issue: https://github.com/opencontainers/runc/issues/726
 | |
| 	// The current chosen number is consistent with most of other os dist.
 | |
| 	const maxKeysPath = "/proc/sys/kernel/keys/root_maxkeys"
 | |
| 	const minKeys uint64 = 1000000
 | |
| 	key, err := ioutil.ReadFile(maxKeysPath)
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Cannot read keys quota in %s", maxKeysPath)
 | |
| 	} else {
 | |
| 		fields := strings.Fields(string(key))
 | |
| 		nKey, _ := strconv.ParseUint(fields[0], 10, 64)
 | |
| 		if nKey < minKeys {
 | |
| 			glog.Infof("Setting keys quota in %s to %d", maxKeysPath, minKeys)
 | |
| 			err = ioutil.WriteFile(maxKeysPath, []byte(fmt.Sprintf("%d", uint64(minKeys))), 0644)
 | |
| 			if err != nil {
 | |
| 				glog.Warningf("Failed to update %s: %v", maxKeysPath, err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	const maxBytesPath = "/proc/sys/kernel/keys/root_maxbytes"
 | |
| 	const minBytes uint64 = 25000000
 | |
| 	bytes, err := ioutil.ReadFile(maxBytesPath)
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Cannot read keys bytes in %s", maxBytesPath)
 | |
| 	} else {
 | |
| 		fields := strings.Fields(string(bytes))
 | |
| 		nByte, _ := strconv.ParseUint(fields[0], 10, 64)
 | |
| 		if nByte < minBytes {
 | |
| 			glog.Infof("Setting keys bytes in %s to %d", maxBytesPath, minBytes)
 | |
| 			err = ioutil.WriteFile(maxBytesPath, []byte(fmt.Sprintf("%d", uint64(minBytes))), 0644)
 | |
| 			if err != nil {
 | |
| 				glog.Warningf("Failed to update %s: %v", maxBytesPath, err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// process pods and exit.
 | |
| 	if runOnce {
 | |
| 		if _, err := k.RunOnce(podCfg.Updates()); err != nil {
 | |
| 			return fmt.Errorf("runonce failed: %v", err)
 | |
| 		}
 | |
| 		glog.Infof("Started kubelet %s as runonce", version.Get().String())
 | |
| 	} else {
 | |
| 		startKubelet(k, podCfg, kubeCfg, kubeDeps)
 | |
| 		glog.Infof("Started kubelet %s", version.Get().String())
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) {
 | |
| 	// start the kubelet
 | |
| 	go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)
 | |
| 
 | |
| 	// start the kubelet server
 | |
| 	if kubeCfg.EnableServer {
 | |
| 		go wait.Until(func() {
 | |
| 			k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
 | |
| 		}, 0, wait.NeverStop)
 | |
| 	}
 | |
| 	if kubeCfg.ReadOnlyPort > 0 {
 | |
| 		go wait.Until(func() {
 | |
| 			k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
 | |
| 		}, 0, wait.NeverStop)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool, hostnameOverride, nodeIP, dockershimRootDir, providerID string) (k kubelet.KubeletBootstrap, err error) {
 | |
| 	// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
 | |
| 	// up into "per source" synchronizations
 | |
| 
 | |
| 	k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, standaloneMode, hostnameOverride, nodeIP, dockershimRootDir, providerID)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	k.BirthCry()
 | |
| 
 | |
| 	k.StartGarbageCollection()
 | |
| 
 | |
| 	return k, nil
 | |
| }
 | |
| 
 | |
| // parseResourceList parses the given configuration map into an API
 | |
| // ResourceList or returns an error.
 | |
| func parseResourceList(m componentconfig.ConfigurationMap) (v1.ResourceList, error) {
 | |
| 	if len(m) == 0 {
 | |
| 		return nil, nil
 | |
| 	}
 | |
| 	rl := make(v1.ResourceList)
 | |
| 	for k, v := range m {
 | |
| 		switch v1.ResourceName(k) {
 | |
| 		// Only CPU and memory resources are supported.
 | |
| 		case v1.ResourceCPU, v1.ResourceMemory:
 | |
| 			q, err := resource.ParseQuantity(v)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			if q.Sign() == -1 {
 | |
| 				return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
 | |
| 			}
 | |
| 			rl[v1.ResourceName(k)] = q
 | |
| 		default:
 | |
| 			return nil, fmt.Errorf("cannot reserve %q resource", k)
 | |
| 		}
 | |
| 	}
 | |
| 	return rl, nil
 | |
| }
 | |
| 
 | |
| // RunDockershim only starts the dockershim in current process. This is only used for cri validate testing purpose
 | |
| // TODO(random-liu): Move this to a separate binary.
 | |
| func RunDockershim(c *componentconfig.KubeletConfiguration, dockershimRootDir string) error {
 | |
| 	// Create docker client.
 | |
| 	dockerClient := libdocker.ConnectToDockerOrDie(c.DockerEndpoint, c.RuntimeRequestTimeout.Duration,
 | |
| 		c.ImagePullProgressDeadline.Duration)
 | |
| 
 | |
| 	// Initialize network plugin settings.
 | |
| 	binDir := c.CNIBinDir
 | |
| 	if binDir == "" {
 | |
| 		binDir = c.NetworkPluginDir
 | |
| 	}
 | |
| 	nh := &kubelet.NoOpLegacyHost{}
 | |
| 	pluginSettings := dockershim.NetworkPluginSettings{
 | |
| 		HairpinMode:       componentconfig.HairpinMode(c.HairpinMode),
 | |
| 		NonMasqueradeCIDR: c.NonMasqueradeCIDR,
 | |
| 		PluginName:        c.NetworkPluginName,
 | |
| 		PluginConfDir:     c.CNIConfDir,
 | |
| 		PluginBinDir:      binDir,
 | |
| 		MTU:               int(c.NetworkPluginMTU),
 | |
| 		LegacyRuntimeHost: nh,
 | |
| 	}
 | |
| 
 | |
| 	// Initialize streaming configuration. (Not using TLS now)
 | |
| 	streamingConfig := &streaming.Config{
 | |
| 		// Use a relative redirect (no scheme or host).
 | |
| 		BaseURL:                         &url.URL{Path: "/cri/"},
 | |
| 		StreamIdleTimeout:               c.StreamingConnectionIdleTimeout.Duration,
 | |
| 		StreamCreationTimeout:           streaming.DefaultConfig.StreamCreationTimeout,
 | |
| 		SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
 | |
| 		SupportedPortForwardProtocols:   streaming.DefaultConfig.SupportedPortForwardProtocols,
 | |
| 	}
 | |
| 
 | |
| 	ds, err := dockershim.NewDockerService(dockerClient, c.SeccompProfileRoot, c.PodInfraContainerImage,
 | |
| 		streamingConfig, &pluginSettings, c.RuntimeCgroups, c.CgroupDriver, c.DockerExecHandlerName, dockershimRootDir,
 | |
| 		c.DockerDisableSharedPID)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err := ds.Start(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// The unix socket for kubelet <-> dockershim communication.
 | |
| 	ep := c.RemoteRuntimeEndpoint
 | |
| 	if len(ep) == 0 {
 | |
| 		ep = "/var/run/dockershim.sock"
 | |
| 	}
 | |
| 
 | |
| 	glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
 | |
| 	server := dockerremote.NewDockerServer(ep, ds)
 | |
| 	if err := server.Start(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Start the streaming server
 | |
| 	addr := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port)))
 | |
| 	return http.ListenAndServe(addr, ds)
 | |
| }
 | 
