mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-30 17:58:14 +00:00 
			
		
		
		
	 e9de1b0221
			
		
	
	e9de1b0221
	
	
	
		
			
			Automatic merge from submit-queue (batch tested with PRs 41236, 40992) Removed unnecessarly empty line.
		
			
				
	
	
		
			2162 lines
		
	
	
		
			81 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			2162 lines
		
	
	
		
			81 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2015 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package kubelet
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"net"
 | |
| 	"net/http"
 | |
| 	"net/url"
 | |
| 	"os"
 | |
| 	"path"
 | |
| 	"sort"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/golang/glog"
 | |
| 
 | |
| 	clientgoclientset "k8s.io/client-go/kubernetes"
 | |
| 
 | |
| 	cadvisorapi "github.com/google/cadvisor/info/v1"
 | |
| 	"k8s.io/apimachinery/pkg/api/resource"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/fields"
 | |
| 	"k8s.io/apimachinery/pkg/labels"
 | |
| 	"k8s.io/apimachinery/pkg/types"
 | |
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/util/sets"
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| 	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | |
| 	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
 | |
| 	clientv1 "k8s.io/client-go/pkg/api/v1"
 | |
| 	"k8s.io/client-go/tools/cache"
 | |
| 	"k8s.io/client-go/tools/record"
 | |
| 	"k8s.io/client-go/util/clock"
 | |
| 	"k8s.io/client-go/util/flowcontrol"
 | |
| 	"k8s.io/client-go/util/integer"
 | |
| 	"k8s.io/kubernetes/pkg/api"
 | |
| 	"k8s.io/kubernetes/pkg/api/v1"
 | |
| 	"k8s.io/kubernetes/pkg/apis/componentconfig"
 | |
| 	componentconfigv1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
 | |
| 	"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
 | |
| 	"k8s.io/kubernetes/pkg/client/legacylisters"
 | |
| 	"k8s.io/kubernetes/pkg/cloudprovider"
 | |
| 	"k8s.io/kubernetes/pkg/features"
 | |
| 	internalapi "k8s.io/kubernetes/pkg/kubelet/api"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/cadvisor"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/cm"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/config"
 | |
| 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/dockershim"
 | |
| 	dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/dockertools"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/events"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/eviction"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/images"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/lifecycle"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/metrics"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/network"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/pleg"
 | |
| 	kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/prober"
 | |
| 	proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/remote"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/rkt"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/secret"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/server"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/server/stats"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/server/streaming"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/status"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/sysctl"
 | |
| 	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/util/format"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/util/queue"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/volumemanager"
 | |
| 	"k8s.io/kubernetes/pkg/security/apparmor"
 | |
| 	"k8s.io/kubernetes/pkg/util/bandwidth"
 | |
| 	utildbus "k8s.io/kubernetes/pkg/util/dbus"
 | |
| 	utilexec "k8s.io/kubernetes/pkg/util/exec"
 | |
| 	kubeio "k8s.io/kubernetes/pkg/util/io"
 | |
| 	utilipt "k8s.io/kubernetes/pkg/util/iptables"
 | |
| 	"k8s.io/kubernetes/pkg/util/mount"
 | |
| 	nodeutil "k8s.io/kubernetes/pkg/util/node"
 | |
| 	"k8s.io/kubernetes/pkg/util/oom"
 | |
| 	"k8s.io/kubernetes/pkg/util/procfs"
 | |
| 	"k8s.io/kubernetes/pkg/volume"
 | |
| 	"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// Max amount of time to wait for the container runtime to come up.
 | |
| 	maxWaitForContainerRuntime = 30 * time.Second
 | |
| 
 | |
| 	// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
 | |
| 	nodeStatusUpdateRetry = 5
 | |
| 
 | |
| 	// Location of container logs.
 | |
| 	ContainerLogsDir = "/var/log/containers"
 | |
| 
 | |
| 	// max backoff period, exported for the e2e test
 | |
| 	MaxContainerBackOff = 300 * time.Second
 | |
| 
 | |
| 	// Capacity of the channel for storing pods to kill. A small number should
 | |
| 	// suffice because a goroutine is dedicated to check the channel and does
 | |
| 	// not block on anything else.
 | |
| 	podKillingChannelCapacity = 50
 | |
| 
 | |
| 	// Period for performing global cleanup tasks.
 | |
| 	housekeepingPeriod = time.Second * 2
 | |
| 
 | |
| 	// Period for performing eviction monitoring.
 | |
| 	// TODO ensure this is in sync with internal cadvisor housekeeping.
 | |
| 	evictionMonitoringPeriod = time.Second * 10
 | |
| 
 | |
| 	// The path in containers' filesystems where the hosts file is mounted.
 | |
| 	etcHostsPath = "/etc/hosts"
 | |
| 
 | |
| 	// Capacity of the channel for receiving pod lifecycle events. This number
 | |
| 	// is a bit arbitrary and may be adjusted in the future.
 | |
| 	plegChannelCapacity = 1000
 | |
| 
 | |
| 	// Generic PLEG relies on relisting for discovering container events.
 | |
| 	// A longer period means that kubelet will take longer to detect container
 | |
| 	// changes and to update pod status. On the other hand, a shorter period
 | |
| 	// will cause more frequent relisting (e.g., container runtime operations),
 | |
| 	// leading to higher cpu usage.
 | |
| 	// Note that even though we set the period to 1s, the relisting itself can
 | |
| 	// take more than 1s to finish if the container runtime responds slowly
 | |
| 	// and/or when there are many container changes in one cycle.
 | |
| 	plegRelistPeriod = time.Second * 1
 | |
| 
 | |
| 	// backOffPeriod is the period to back off when pod syncing results in an
 | |
| 	// error. It is also used as the base period for the exponential backoff
 | |
| 	// container restarts and image pulls.
 | |
| 	backOffPeriod = time.Second * 10
 | |
| 
 | |
| 	// Period for performing container garbage collection.
 | |
| 	ContainerGCPeriod = time.Minute
 | |
| 	// Period for performing image garbage collection.
 | |
| 	ImageGCPeriod = 5 * time.Minute
 | |
| 
 | |
| 	// Minimum number of dead containers to keep in a pod
 | |
| 	minDeadContainerInPod = 1
 | |
| )
 | |
| 
 | |
| // SyncHandler is an interface implemented by Kubelet, for testability
 | |
| type SyncHandler interface {
 | |
| 	HandlePodAdditions(pods []*v1.Pod)
 | |
| 	HandlePodUpdates(pods []*v1.Pod)
 | |
| 	HandlePodRemoves(pods []*v1.Pod)
 | |
| 	HandlePodReconcile(pods []*v1.Pod)
 | |
| 	HandlePodSyncs(pods []*v1.Pod)
 | |
| 	HandlePodCleanups() error
 | |
| }
 | |
| 
 | |
| // Option is a functional option type for Kubelet
 | |
| type Option func(*Kubelet)
 | |
| 
 | |
| // bootstrapping interface for kubelet, targets the initialization protocol
 | |
| type KubeletBootstrap interface {
 | |
| 	GetConfiguration() componentconfig.KubeletConfiguration
 | |
| 	BirthCry()
 | |
| 	StartGarbageCollection()
 | |
| 	ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool)
 | |
| 	ListenAndServeReadOnly(address net.IP, port uint)
 | |
| 	Run(<-chan kubetypes.PodUpdate)
 | |
| 	RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
 | |
| }
 | |
| 
 | |
| // create and initialize a Kubelet instance
 | |
| type KubeletBuilder func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (KubeletBootstrap, error)
 | |
| 
 | |
| // KubeletDeps is a bin for things we might consider "injected dependencies" -- objects constructed
 | |
| // at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
 | |
| // these objects while we figure out a more comprehensive dependency injection story for the Kubelet.
 | |
| type KubeletDeps struct {
 | |
| 	// TODO(mtaufen): KubeletBuilder:
 | |
| 	//                Mesos currently uses this as a hook to let them make their own call to
 | |
| 	//                let them wrap the KubeletBootstrap that CreateAndInitKubelet returns with
 | |
| 	//                their own KubeletBootstrap. It's a useful hook. I need to think about what
 | |
| 	//                a nice home for it would be. There seems to be a trend, between this and
 | |
| 	//                the Options fields below, of providing hooks where you can add extra functionality
 | |
| 	//                to the Kubelet for your solution. Maybe we should centralize these sorts of things?
 | |
| 	Builder KubeletBuilder
 | |
| 
 | |
| 	// TODO(mtaufen): ContainerRuntimeOptions and Options:
 | |
| 	//                Arrays of functions that can do arbitrary things to the Kubelet and the Runtime
 | |
| 	//                seem like a difficult path to trace when it's time to debug something.
 | |
| 	//                I'm leaving these fields here for now, but there is likely an easier-to-follow
 | |
| 	//                way to support their intended use cases. E.g. ContainerRuntimeOptions
 | |
| 	//                is used by Mesos to set an environment variable in containers which has
 | |
| 	//                some connection to their container GC. It seems that Mesos intends to use
 | |
| 	//                Options to add additional node conditions that are updated as part of the
 | |
| 	//                Kubelet lifecycle (see https://github.com/kubernetes/kubernetes/pull/21521).
 | |
| 	//                We should think about providing more explicit ways of doing these things.
 | |
| 	ContainerRuntimeOptions []kubecontainer.Option
 | |
| 	Options                 []Option
 | |
| 
 | |
| 	// Injected Dependencies
 | |
| 	Auth               server.AuthInterface
 | |
| 	CAdvisorInterface  cadvisor.Interface
 | |
| 	Cloud              cloudprovider.Interface
 | |
| 	ContainerManager   cm.ContainerManager
 | |
| 	DockerClient       dockertools.DockerInterface
 | |
| 	EventClient        v1core.EventsGetter
 | |
| 	KubeClient         clientset.Interface
 | |
| 	ExternalKubeClient clientgoclientset.Interface
 | |
| 	Mounter            mount.Interface
 | |
| 	NetworkPlugins     []network.NetworkPlugin
 | |
| 	OOMAdjuster        *oom.OOMAdjuster
 | |
| 	OSInterface        kubecontainer.OSInterface
 | |
| 	PodConfig          *config.PodConfig
 | |
| 	Recorder           record.EventRecorder
 | |
| 	Writer             kubeio.Writer
 | |
| 	VolumePlugins      []volume.VolumePlugin
 | |
| 	TLSOptions         *server.TLSOptions
 | |
| }
 | |
| 
 | |
| // makePodSourceConfig creates a config.PodConfig from the given
 | |
| // KubeletConfiguration or returns an error.
 | |
| func makePodSourceConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, nodeName types.NodeName) (*config.PodConfig, error) {
 | |
| 	manifestURLHeader := make(http.Header)
 | |
| 	if kubeCfg.ManifestURLHeader != "" {
 | |
| 		pieces := strings.Split(kubeCfg.ManifestURLHeader, ":")
 | |
| 		if len(pieces) != 2 {
 | |
| 			return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", kubeCfg.ManifestURLHeader)
 | |
| 		}
 | |
| 		manifestURLHeader.Set(pieces[0], pieces[1])
 | |
| 	}
 | |
| 
 | |
| 	// source of all configuration
 | |
| 	cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
 | |
| 
 | |
| 	// define file config source
 | |
| 	if kubeCfg.PodManifestPath != "" {
 | |
| 		glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath)
 | |
| 		config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
 | |
| 	}
 | |
| 
 | |
| 	// define url config source
 | |
| 	if kubeCfg.ManifestURL != "" {
 | |
| 		glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader)
 | |
| 		config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
 | |
| 	}
 | |
| 	if kubeDeps.KubeClient != nil {
 | |
| 		glog.Infof("Watching apiserver")
 | |
| 		config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
 | |
| 	}
 | |
| 	return cfg, nil
 | |
| }
 | |
| 
 | |
| func getRuntimeAndImageServices(config *componentconfig.KubeletConfiguration) (internalapi.RuntimeService, internalapi.ImageManagerService, error) {
 | |
| 	rs, err := remote.NewRemoteRuntimeService(config.RemoteRuntimeEndpoint, config.RuntimeRequestTimeout.Duration)
 | |
| 	if err != nil {
 | |
| 		return nil, nil, err
 | |
| 	}
 | |
| 	is, err := remote.NewRemoteImageService(config.RemoteImageEndpoint, config.RuntimeRequestTimeout.Duration)
 | |
| 	if err != nil {
 | |
| 		return nil, nil, err
 | |
| 	}
 | |
| 	return rs, is, err
 | |
| }
 | |
| 
 | |
| // NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
 | |
| // No initialization of Kubelet and its modules should happen here.
 | |
| func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) {
 | |
| 	if kubeCfg.RootDirectory == "" {
 | |
| 		return nil, fmt.Errorf("invalid root directory %q", kubeCfg.RootDirectory)
 | |
| 	}
 | |
| 	if kubeCfg.SyncFrequency.Duration <= 0 {
 | |
| 		return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
 | |
| 	}
 | |
| 
 | |
| 	if kubeCfg.MakeIPTablesUtilChains {
 | |
| 		if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 {
 | |
| 			return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]")
 | |
| 		}
 | |
| 		if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 {
 | |
| 			return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]")
 | |
| 		}
 | |
| 		if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit {
 | |
| 			return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride)
 | |
| 	// Query the cloud provider for our node name, default to hostname
 | |
| 	nodeName := types.NodeName(hostname)
 | |
| 	if kubeDeps.Cloud != nil {
 | |
| 		var err error
 | |
| 		instances, ok := kubeDeps.Cloud.Instances()
 | |
| 		if !ok {
 | |
| 			return nil, fmt.Errorf("failed to get instances from cloud provider")
 | |
| 		}
 | |
| 
 | |
| 		nodeName, err = instances.CurrentNodeName(hostname)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
 | |
| 		}
 | |
| 
 | |
| 		glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
 | |
| 	}
 | |
| 
 | |
| 	if kubeDeps.PodConfig == nil {
 | |
| 		var err error
 | |
| 		kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	containerGCPolicy := kubecontainer.ContainerGCPolicy{
 | |
| 		MinAge:             kubeCfg.MinimumGCAge.Duration,
 | |
| 		MaxPerPodContainer: int(kubeCfg.MaxPerPodContainerCount),
 | |
| 		MaxContainers:      int(kubeCfg.MaxContainerCount),
 | |
| 	}
 | |
| 
 | |
| 	daemonEndpoints := &v1.NodeDaemonEndpoints{
 | |
| 		KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
 | |
| 	}
 | |
| 
 | |
| 	imageGCPolicy := images.ImageGCPolicy{
 | |
| 		MinAge:               kubeCfg.ImageMinimumGCAge.Duration,
 | |
| 		HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
 | |
| 		LowThresholdPercent:  int(kubeCfg.ImageGCLowThresholdPercent),
 | |
| 	}
 | |
| 
 | |
| 	diskSpacePolicy := DiskSpacePolicy{
 | |
| 		DockerFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB),
 | |
| 		RootFreeDiskMB:   int(kubeCfg.LowDiskSpaceThresholdMB),
 | |
| 	}
 | |
| 
 | |
| 	thresholds, err := eviction.ParseThresholdConfig(kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	evictionConfig := eviction.Config{
 | |
| 		PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
 | |
| 		MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
 | |
| 		Thresholds:               thresholds,
 | |
| 		KernelMemcgNotification:  kubeCfg.ExperimentalKernelMemcgNotification,
 | |
| 	}
 | |
| 
 | |
| 	reservation, err := ParseReservation(kubeCfg.KubeReserved, kubeCfg.SystemReserved)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var dockerExecHandler dockertools.ExecHandler
 | |
| 	switch kubeCfg.DockerExecHandlerName {
 | |
| 	case "native":
 | |
| 		dockerExecHandler = &dockertools.NativeExecHandler{}
 | |
| 	case "nsenter":
 | |
| 		dockerExecHandler = &dockertools.NsenterExecHandler{}
 | |
| 	default:
 | |
| 		glog.Warningf("Unknown Docker exec handler %q; defaulting to native", kubeCfg.DockerExecHandlerName)
 | |
| 		dockerExecHandler = &dockertools.NativeExecHandler{}
 | |
| 	}
 | |
| 
 | |
| 	serviceStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
 | |
| 	if kubeDeps.KubeClient != nil {
 | |
| 		serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
 | |
| 		cache.NewReflector(serviceLW, &v1.Service{}, serviceStore, 0).Run()
 | |
| 	}
 | |
| 	serviceLister := &listers.StoreToServiceLister{Indexer: serviceStore}
 | |
| 
 | |
| 	nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
 | |
| 	if kubeDeps.KubeClient != nil {
 | |
| 		fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
 | |
| 		nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
 | |
| 		cache.NewReflector(nodeLW, &v1.Node{}, nodeStore, 0).Run()
 | |
| 	}
 | |
| 	nodeLister := &listers.StoreToNodeLister{Store: nodeStore}
 | |
| 	nodeInfo := &predicates.CachedNodeInfo{StoreToNodeLister: nodeLister}
 | |
| 
 | |
| 	// TODO: get the real node object of ourself,
 | |
| 	// and use the real node name and UID.
 | |
| 	// TODO: what is namespace for node?
 | |
| 	nodeRef := &clientv1.ObjectReference{
 | |
| 		Kind:      "Node",
 | |
| 		Name:      string(nodeName),
 | |
| 		UID:       types.UID(nodeName),
 | |
| 		Namespace: "",
 | |
| 	}
 | |
| 
 | |
| 	diskSpaceManager, err := newDiskSpaceManager(kubeDeps.CAdvisorInterface, diskSpacePolicy)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
 | |
| 	}
 | |
| 	containerRefManager := kubecontainer.NewRefManager()
 | |
| 
 | |
| 	oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder)
 | |
| 
 | |
| 	klet := &Kubelet{
 | |
| 		hostname:                       hostname,
 | |
| 		nodeName:                       nodeName,
 | |
| 		dockerClient:                   kubeDeps.DockerClient,
 | |
| 		kubeClient:                     kubeDeps.KubeClient,
 | |
| 		rootDirectory:                  kubeCfg.RootDirectory,
 | |
| 		resyncInterval:                 kubeCfg.SyncFrequency.Duration,
 | |
| 		containerRefManager:            containerRefManager,
 | |
| 		httpClient:                     &http.Client{},
 | |
| 		sourcesReady:                   config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
 | |
| 		registerNode:                   kubeCfg.RegisterNode,
 | |
| 		registerSchedulable:            kubeCfg.RegisterSchedulable,
 | |
| 		standaloneMode:                 standaloneMode,
 | |
| 		clusterDomain:                  kubeCfg.ClusterDomain,
 | |
| 		clusterDNS:                     net.ParseIP(kubeCfg.ClusterDNS),
 | |
| 		serviceLister:                  serviceLister,
 | |
| 		nodeLister:                     nodeLister,
 | |
| 		nodeInfo:                       nodeInfo,
 | |
| 		masterServiceNamespace:         kubeCfg.MasterServiceNamespace,
 | |
| 		streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
 | |
| 		recorder:                       kubeDeps.Recorder,
 | |
| 		cadvisor:                       kubeDeps.CAdvisorInterface,
 | |
| 		diskSpaceManager:               diskSpaceManager,
 | |
| 		cloud:                          kubeDeps.Cloud,
 | |
| 		autoDetectCloudProvider:   (componentconfigv1alpha1.AutoDetectCloudProvider == kubeCfg.CloudProvider),
 | |
| 		nodeRef:                   nodeRef,
 | |
| 		nodeLabels:                kubeCfg.NodeLabels,
 | |
| 		nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
 | |
| 		os:                kubeDeps.OSInterface,
 | |
| 		oomWatcher:        oomWatcher,
 | |
| 		cgroupsPerQOS:     kubeCfg.CgroupsPerQOS,
 | |
| 		cgroupRoot:        kubeCfg.CgroupRoot,
 | |
| 		mounter:           kubeDeps.Mounter,
 | |
| 		writer:            kubeDeps.Writer,
 | |
| 		nonMasqueradeCIDR: kubeCfg.NonMasqueradeCIDR,
 | |
| 		maxPods:           int(kubeCfg.MaxPods),
 | |
| 		podsPerCore:       int(kubeCfg.PodsPerCore),
 | |
| 		nvidiaGPUs:        int(kubeCfg.NvidiaGPUs),
 | |
| 		syncLoopMonitor:   atomic.Value{},
 | |
| 		resolverConfig:    kubeCfg.ResolverConfig,
 | |
| 		cpuCFSQuota:       kubeCfg.CPUCFSQuota,
 | |
| 		daemonEndpoints:   daemonEndpoints,
 | |
| 		containerManager:  kubeDeps.ContainerManager,
 | |
| 		nodeIP:            net.ParseIP(kubeCfg.NodeIP),
 | |
| 		clock:             clock.RealClock{},
 | |
| 		outOfDiskTransitionFrequency:            kubeCfg.OutOfDiskTransitionFrequency.Duration,
 | |
| 		reservation:                             *reservation,
 | |
| 		enableCustomMetrics:                     kubeCfg.EnableCustomMetrics,
 | |
| 		babysitDaemons:                          kubeCfg.BabysitDaemons,
 | |
| 		enableControllerAttachDetach:            kubeCfg.EnableControllerAttachDetach,
 | |
| 		iptClient:                               utilipt.New(utilexec.New(), utildbus.New(), utilipt.ProtocolIpv4),
 | |
| 		makeIPTablesUtilChains:                  kubeCfg.MakeIPTablesUtilChains,
 | |
| 		iptablesMasqueradeBit:                   int(kubeCfg.IPTablesMasqueradeBit),
 | |
| 		iptablesDropBit:                         int(kubeCfg.IPTablesDropBit),
 | |
| 		experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
 | |
| 	}
 | |
| 
 | |
| 	secretManager, err := secret.NewCachingSecretManager(
 | |
| 		kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode))
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to initialize secret manager: %v", err)
 | |
| 	}
 | |
| 	klet.secretManager = secretManager
 | |
| 
 | |
| 	if klet.experimentalHostUserNamespaceDefaulting {
 | |
| 		glog.Infof("Experimental host user namespace defaulting is enabled.")
 | |
| 	}
 | |
| 
 | |
| 	if mode, err := effectiveHairpinMode(componentconfig.HairpinMode(kubeCfg.HairpinMode), kubeCfg.ContainerRuntime, kubeCfg.NetworkPluginName); err != nil {
 | |
| 		// This is a non-recoverable error. Returning it up the callstack will just
 | |
| 		// lead to retries of the same failure, so just fail hard.
 | |
| 		glog.Fatalf("Invalid hairpin mode: %v", err)
 | |
| 	} else {
 | |
| 		klet.hairpinMode = mode
 | |
| 	}
 | |
| 	glog.Infof("Hairpin mode set to %q", klet.hairpinMode)
 | |
| 
 | |
| 	if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &criNetworkHost{&networkHost{klet}, &network.NoopPortMappingGetter{}}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil {
 | |
| 		return nil, err
 | |
| 	} else {
 | |
| 		klet.networkPlugin = plug
 | |
| 	}
 | |
| 
 | |
| 	machineInfo, err := klet.GetCachedMachineInfo()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	procFs := procfs.NewProcFS()
 | |
| 	imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
 | |
| 
 | |
| 	klet.livenessManager = proberesults.NewManager()
 | |
| 
 | |
| 	klet.podCache = kubecontainer.NewCache()
 | |
| 	// podManager is also responsible for keeping secretManager contents up-to-date.
 | |
| 	klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager)
 | |
| 
 | |
| 	if kubeCfg.RemoteRuntimeEndpoint != "" {
 | |
| 		// kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified
 | |
| 		if kubeCfg.RemoteImageEndpoint == "" {
 | |
| 			kubeCfg.RemoteImageEndpoint = kubeCfg.RemoteRuntimeEndpoint
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// TODO: These need to become arguments to a standalone docker shim.
 | |
| 	binDir := kubeCfg.CNIBinDir
 | |
| 	if binDir == "" {
 | |
| 		binDir = kubeCfg.NetworkPluginDir
 | |
| 	}
 | |
| 	pluginSettings := dockershim.NetworkPluginSettings{
 | |
| 		HairpinMode:       klet.hairpinMode,
 | |
| 		NonMasqueradeCIDR: klet.nonMasqueradeCIDR,
 | |
| 		PluginName:        kubeCfg.NetworkPluginName,
 | |
| 		PluginConfDir:     kubeCfg.CNIConfDir,
 | |
| 		PluginBinDir:      binDir,
 | |
| 		MTU:               int(kubeCfg.NetworkPluginMTU),
 | |
| 	}
 | |
| 
 | |
| 	// Remote runtime shim just cannot talk back to kubelet, so it doesn't
 | |
| 	// support bandwidth shaping or hostports till #35457. To enable legacy
 | |
| 	// features, replace with networkHost.
 | |
| 	var nl *noOpLegacyHost
 | |
| 	pluginSettings.LegacyRuntimeHost = nl
 | |
| 
 | |
| 	if kubeCfg.EnableCRI {
 | |
| 		// kubelet defers to the runtime shim to setup networking. Setting
 | |
| 		// this to nil will prevent it from trying to invoke the plugin.
 | |
| 		// It's easier to always probe and initialize plugins till cri
 | |
| 		// becomes the default.
 | |
| 		klet.networkPlugin = nil
 | |
| 
 | |
| 		switch kubeCfg.ContainerRuntime {
 | |
| 		case "docker":
 | |
| 			// Create and start the CRI shim running as a grpc server.
 | |
| 			streamingConfig := getStreamingConfig(kubeCfg, kubeDeps)
 | |
| 			ds, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage,
 | |
| 				streamingConfig, &pluginSettings, kubeCfg.RuntimeCgroups, kubeCfg.CgroupDriver, dockerExecHandler)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			if err := ds.Start(); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			// For now, the CRI shim redirects the streaming requests to the
 | |
| 			// kubelet, which handles the requests using DockerService..
 | |
| 			klet.criHandler = ds
 | |
| 
 | |
| 			const (
 | |
| 				// The unix socket for kubelet <-> dockershim communication.
 | |
| 				ep = "/var/run/dockershim.sock"
 | |
| 			)
 | |
| 			kubeCfg.RemoteRuntimeEndpoint, kubeCfg.RemoteImageEndpoint = ep, ep
 | |
| 
 | |
| 			glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
 | |
| 			server := dockerremote.NewDockerServer(ep, ds)
 | |
| 			if err := server.Start(); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 		case "remote":
 | |
| 			// No-op.
 | |
| 			break
 | |
| 		default:
 | |
| 			return nil, fmt.Errorf("unsupported CRI runtime: %q", kubeCfg.ContainerRuntime)
 | |
| 		}
 | |
| 		runtimeService, imageService, err := getRuntimeAndImageServices(kubeCfg)
 | |
| 		runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
 | |
| 			kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
 | |
| 			klet.livenessManager,
 | |
| 			containerRefManager,
 | |
| 			machineInfo,
 | |
| 			klet.podManager,
 | |
| 			kubeDeps.OSInterface,
 | |
| 			klet.networkPlugin,
 | |
| 			klet,
 | |
| 			klet.httpClient,
 | |
| 			imageBackOff,
 | |
| 			kubeCfg.SerializeImagePulls,
 | |
| 			float32(kubeCfg.RegistryPullQPS),
 | |
| 			int(kubeCfg.RegistryBurst),
 | |
| 			klet.cpuCFSQuota,
 | |
| 			runtimeService,
 | |
| 			imageService,
 | |
| 		)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		klet.containerRuntime = runtime
 | |
| 		klet.runner = runtime
 | |
| 	} else {
 | |
| 		switch kubeCfg.ContainerRuntime {
 | |
| 		case "docker":
 | |
| 			runtime := dockertools.NewDockerManager(
 | |
| 				kubeDeps.DockerClient,
 | |
| 				kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
 | |
| 				klet.livenessManager,
 | |
| 				containerRefManager,
 | |
| 				klet.podManager,
 | |
| 				machineInfo,
 | |
| 				kubeCfg.PodInfraContainerImage,
 | |
| 				float32(kubeCfg.RegistryPullQPS),
 | |
| 				int(kubeCfg.RegistryBurst),
 | |
| 				ContainerLogsDir,
 | |
| 				kubeDeps.OSInterface,
 | |
| 				klet.networkPlugin,
 | |
| 				klet,
 | |
| 				klet.httpClient,
 | |
| 				dockerExecHandler,
 | |
| 				kubeDeps.OOMAdjuster,
 | |
| 				procFs,
 | |
| 				klet.cpuCFSQuota,
 | |
| 				imageBackOff,
 | |
| 				kubeCfg.SerializeImagePulls,
 | |
| 				kubeCfg.EnableCustomMetrics,
 | |
| 				// If using "kubenet", the Kubernetes network plugin that wraps
 | |
| 				// CNI's bridge plugin, it knows how to set the hairpin veth flag
 | |
| 				// so we tell the container runtime to back away from setting it.
 | |
| 				// If the kubelet is started with any other plugin we can't be
 | |
| 				// sure it handles the hairpin case so we instruct the docker
 | |
| 				// runtime to set the flag instead.
 | |
| 				klet.hairpinMode == componentconfig.HairpinVeth && kubeCfg.NetworkPluginName != "kubenet",
 | |
| 				kubeCfg.SeccompProfileRoot,
 | |
| 				kubeDeps.ContainerRuntimeOptions...,
 | |
| 			)
 | |
| 			klet.containerRuntime = runtime
 | |
| 			klet.runner = kubecontainer.DirectStreamingRunner(runtime)
 | |
| 		case "rkt":
 | |
| 			// TODO: Include hairpin mode settings in rkt?
 | |
| 			conf := &rkt.Config{
 | |
| 				Path:            kubeCfg.RktPath,
 | |
| 				Stage1Image:     kubeCfg.RktStage1Image,
 | |
| 				InsecureOptions: "image,ondisk",
 | |
| 			}
 | |
| 			runtime, err := rkt.New(
 | |
| 				kubeCfg.RktAPIEndpoint,
 | |
| 				conf,
 | |
| 				klet,
 | |
| 				kubeDeps.Recorder,
 | |
| 				containerRefManager,
 | |
| 				klet.podManager,
 | |
| 				klet.livenessManager,
 | |
| 				klet.httpClient,
 | |
| 				klet.networkPlugin,
 | |
| 				klet.hairpinMode == componentconfig.HairpinVeth,
 | |
| 				utilexec.New(),
 | |
| 				kubecontainer.RealOS{},
 | |
| 				imageBackOff,
 | |
| 				kubeCfg.SerializeImagePulls,
 | |
| 				float32(kubeCfg.RegistryPullQPS),
 | |
| 				int(kubeCfg.RegistryBurst),
 | |
| 				kubeCfg.RuntimeRequestTimeout.Duration,
 | |
| 			)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			klet.containerRuntime = runtime
 | |
| 			klet.runner = kubecontainer.DirectStreamingRunner(runtime)
 | |
| 		default:
 | |
| 			return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
 | |
| 	klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, klet.containerRuntime)
 | |
| 
 | |
| 	klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
 | |
| 	klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
 | |
| 	klet.updatePodCIDR(kubeCfg.PodCIDR)
 | |
| 
 | |
| 	// setup containerGC
 | |
| 	containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	klet.containerGC = containerGC
 | |
| 	klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
 | |
| 
 | |
| 	// setup imageManager
 | |
| 	imageManager, err := images.NewImageGCManager(klet.containerRuntime, kubeDeps.CAdvisorInterface, kubeDeps.Recorder, nodeRef, imageGCPolicy)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to initialize image manager: %v", err)
 | |
| 	}
 | |
| 	klet.imageManager = imageManager
 | |
| 
 | |
| 	klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager)
 | |
| 
 | |
| 	klet.probeManager = prober.NewManager(
 | |
| 		klet.statusManager,
 | |
| 		klet.livenessManager,
 | |
| 		klet.runner,
 | |
| 		containerRefManager,
 | |
| 		kubeDeps.Recorder)
 | |
| 
 | |
| 	klet.volumePluginMgr, err =
 | |
| 		NewInitializedVolumePluginMgr(klet, secretManager, kubeDeps.VolumePlugins)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// If the experimentalMounterPathFlag is set, we do not want to
 | |
| 	// check node capabilities since the mount path is not the default
 | |
| 	if len(kubeCfg.ExperimentalMounterPath) != 0 {
 | |
| 		kubeCfg.ExperimentalCheckNodeCapabilitiesBeforeMount = false
 | |
| 	}
 | |
| 	// setup volumeManager
 | |
| 	klet.volumeManager, err = volumemanager.NewVolumeManager(
 | |
| 		kubeCfg.EnableControllerAttachDetach,
 | |
| 		nodeName,
 | |
| 		klet.podManager,
 | |
| 		klet.kubeClient,
 | |
| 		klet.volumePluginMgr,
 | |
| 		klet.containerRuntime,
 | |
| 		kubeDeps.Mounter,
 | |
| 		klet.getPodsDir(),
 | |
| 		kubeDeps.Recorder,
 | |
| 		kubeCfg.ExperimentalCheckNodeCapabilitiesBeforeMount,
 | |
| 		kubeCfg.KeepTerminatedPodVolumes)
 | |
| 
 | |
| 	runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	klet.runtimeCache = runtimeCache
 | |
| 	klet.reasonCache = NewReasonCache()
 | |
| 	klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
 | |
| 	klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
 | |
| 
 | |
| 	klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
 | |
| 	klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
 | |
| 	klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
 | |
| 
 | |
| 	// setup eviction manager
 | |
| 	evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, kubeDeps.Recorder, nodeRef, klet.clock)
 | |
| 
 | |
| 	klet.evictionManager = evictionManager
 | |
| 	klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
 | |
| 
 | |
| 	// add sysctl admission
 | |
| 	runtimeSupport, err := sysctl.NewRuntimeAdmitHandler(klet.containerRuntime)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	safeWhitelist, err := sysctl.NewWhitelist(sysctl.SafeSysctlWhitelist(), v1.SysctlsPodAnnotationKey)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	// Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec
 | |
| 	// Hence, we concatenate those two lists.
 | |
| 	safeAndUnsafeSysctls := append(sysctl.SafeSysctlWhitelist(), kubeCfg.AllowedUnsafeSysctls...)
 | |
| 	unsafeWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls, v1.UnsafeSysctlsPodAnnotationKey)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	klet.admitHandlers.AddPodAdmitHandler(runtimeSupport)
 | |
| 	klet.admitHandlers.AddPodAdmitHandler(safeWhitelist)
 | |
| 	klet.admitHandlers.AddPodAdmitHandler(unsafeWhitelist)
 | |
| 
 | |
| 	// enable active deadline handler
 | |
| 	activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	klet.AddPodSyncLoopHandler(activeDeadlineHandler)
 | |
| 	klet.AddPodSyncHandler(activeDeadlineHandler)
 | |
| 
 | |
| 	klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay))
 | |
| 	// apply functional Option's
 | |
| 	for _, opt := range kubeDeps.Options {
 | |
| 		opt(klet)
 | |
| 	}
 | |
| 
 | |
| 	klet.appArmorValidator = apparmor.NewValidator(kubeCfg.ContainerRuntime)
 | |
| 	klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
 | |
| 
 | |
| 	// Finally, put the most recent version of the config on the Kubelet, so
 | |
| 	// people can see how it was configured.
 | |
| 	klet.kubeletConfiguration = *kubeCfg
 | |
| 	return klet, nil
 | |
| }
 | |
| 
 | |
| type serviceLister interface {
 | |
| 	List(labels.Selector) ([]*v1.Service, error)
 | |
| }
 | |
| 
 | |
| type nodeLister interface {
 | |
| 	List() (machines v1.NodeList, err error)
 | |
| }
 | |
| 
 | |
| // Kubelet is the main kubelet implementation.
 | |
| type Kubelet struct {
 | |
| 	kubeletConfiguration componentconfig.KubeletConfiguration
 | |
| 
 | |
| 	hostname      string
 | |
| 	nodeName      types.NodeName
 | |
| 	dockerClient  dockertools.DockerInterface
 | |
| 	runtimeCache  kubecontainer.RuntimeCache
 | |
| 	kubeClient    clientset.Interface
 | |
| 	iptClient     utilipt.Interface
 | |
| 	rootDirectory string
 | |
| 
 | |
| 	// podWorkers handle syncing Pods in response to events.
 | |
| 	podWorkers PodWorkers
 | |
| 
 | |
| 	// resyncInterval is the interval between periodic full reconciliations of
 | |
| 	// pods on this node.
 | |
| 	resyncInterval time.Duration
 | |
| 
 | |
| 	// sourcesReady records the sources seen by the kubelet, it is thread-safe.
 | |
| 	sourcesReady config.SourcesReady
 | |
| 
 | |
| 	// podManager is a facade that abstracts away the various sources of pods
 | |
| 	// this Kubelet services.
 | |
| 	podManager kubepod.Manager
 | |
| 
 | |
| 	// Needed to observe and respond to situations that could impact node stability
 | |
| 	evictionManager eviction.Manager
 | |
| 
 | |
| 	// Needed to report events for containers belonging to deleted/modified pods.
 | |
| 	// Tracks references for reporting events
 | |
| 	containerRefManager *kubecontainer.RefManager
 | |
| 
 | |
| 	// Optional, defaults to /logs/ from /var/log
 | |
| 	logServer http.Handler
 | |
| 	// Optional, defaults to simple Docker implementation
 | |
| 	runner kubecontainer.ContainerCommandRunner
 | |
| 	// Optional, client for http requests, defaults to empty client
 | |
| 	httpClient kubetypes.HttpGetter
 | |
| 
 | |
| 	// cAdvisor used for container information.
 | |
| 	cadvisor cadvisor.Interface
 | |
| 
 | |
| 	// Set to true to have the node register itself with the apiserver.
 | |
| 	registerNode bool
 | |
| 	// Set to true to have the node register itself as schedulable.
 | |
| 	registerSchedulable bool
 | |
| 	// for internal book keeping; access only from within registerWithApiserver
 | |
| 	registrationCompleted bool
 | |
| 
 | |
| 	// Set to true if the kubelet is in standalone mode (i.e. setup without an apiserver)
 | |
| 	standaloneMode bool
 | |
| 
 | |
| 	// If non-empty, use this for container DNS search.
 | |
| 	clusterDomain string
 | |
| 
 | |
| 	// If non-nil, use this for container DNS server.
 | |
| 	clusterDNS net.IP
 | |
| 
 | |
| 	// masterServiceNamespace is the namespace that the master service is exposed in.
 | |
| 	masterServiceNamespace string
 | |
| 	// serviceLister knows how to list services
 | |
| 	serviceLister serviceLister
 | |
| 	// nodeLister knows how to list nodes
 | |
| 	nodeLister nodeLister
 | |
| 	// nodeInfo knows how to get information about the node for this kubelet.
 | |
| 	nodeInfo predicates.NodeInfo
 | |
| 
 | |
| 	// a list of node labels to register
 | |
| 	nodeLabels map[string]string
 | |
| 
 | |
| 	// Last timestamp when runtime responded on ping.
 | |
| 	// Mutex is used to protect this value.
 | |
| 	runtimeState *runtimeState
 | |
| 
 | |
| 	// Volume plugins.
 | |
| 	volumePluginMgr *volume.VolumePluginMgr
 | |
| 
 | |
| 	// Network plugin.
 | |
| 	networkPlugin network.NetworkPlugin
 | |
| 
 | |
| 	// Handles container probing.
 | |
| 	probeManager prober.Manager
 | |
| 	// Manages container health check results.
 | |
| 	livenessManager proberesults.Manager
 | |
| 
 | |
| 	// How long to keep idle streaming command execution/port forwarding
 | |
| 	// connections open before terminating them
 | |
| 	streamingConnectionIdleTimeout time.Duration
 | |
| 
 | |
| 	// The EventRecorder to use
 | |
| 	recorder record.EventRecorder
 | |
| 
 | |
| 	// Policy for handling garbage collection of dead containers.
 | |
| 	containerGC kubecontainer.ContainerGC
 | |
| 
 | |
| 	// Manager for image garbage collection.
 | |
| 	imageManager images.ImageGCManager
 | |
| 
 | |
| 	// Diskspace manager.
 | |
| 	diskSpaceManager diskSpaceManager
 | |
| 
 | |
| 	// Secret manager.
 | |
| 	secretManager secret.Manager
 | |
| 
 | |
| 	// Cached MachineInfo returned by cadvisor.
 | |
| 	machineInfo *cadvisorapi.MachineInfo
 | |
| 
 | |
| 	// Syncs pods statuses with apiserver; also used as a cache of statuses.
 | |
| 	statusManager status.Manager
 | |
| 
 | |
| 	// VolumeManager runs a set of asynchronous loops that figure out which
 | |
| 	// volumes need to be attached/mounted/unmounted/detached based on the pods
 | |
| 	// scheduled on this node and makes it so.
 | |
| 	volumeManager volumemanager.VolumeManager
 | |
| 
 | |
| 	// Cloud provider interface.
 | |
| 	cloud                   cloudprovider.Interface
 | |
| 	autoDetectCloudProvider bool
 | |
| 
 | |
| 	// Reference to this node.
 | |
| 	nodeRef *clientv1.ObjectReference
 | |
| 
 | |
| 	// Container runtime.
 | |
| 	containerRuntime kubecontainer.Runtime
 | |
| 
 | |
| 	// reasonCache caches the failure reason of the last creation of all containers, which is
 | |
| 	// used for generating ContainerStatus.
 | |
| 	reasonCache *ReasonCache
 | |
| 
 | |
| 	// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
 | |
| 	// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
 | |
| 	// in nodecontroller. There are several constraints:
 | |
| 	// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
 | |
| 	//    N means number of retries allowed for kubelet to post node status. It is pointless
 | |
| 	//    to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
 | |
| 	//    will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
 | |
| 	//    The constant must be less than podEvictionTimeout.
 | |
| 	// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
 | |
| 	//    status. Kubelet may fail to update node status reliably if the value is too small,
 | |
| 	//    as it takes time to gather all necessary node information.
 | |
| 	nodeStatusUpdateFrequency time.Duration
 | |
| 
 | |
| 	// Generates pod events.
 | |
| 	pleg pleg.PodLifecycleEventGenerator
 | |
| 
 | |
| 	// Store kubecontainer.PodStatus for all pods.
 | |
| 	podCache kubecontainer.Cache
 | |
| 
 | |
| 	// os is a facade for various syscalls that need to be mocked during testing.
 | |
| 	os kubecontainer.OSInterface
 | |
| 
 | |
| 	// Watcher of out of memory events.
 | |
| 	oomWatcher OOMWatcher
 | |
| 
 | |
| 	// Monitor resource usage
 | |
| 	resourceAnalyzer stats.ResourceAnalyzer
 | |
| 
 | |
| 	// Whether or not we should have the QOS cgroup hierarchy for resource management
 | |
| 	cgroupsPerQOS bool
 | |
| 
 | |
| 	// If non-empty, pass this to the container runtime as the root cgroup.
 | |
| 	cgroupRoot string
 | |
| 
 | |
| 	// Mounter to use for volumes.
 | |
| 	mounter mount.Interface
 | |
| 
 | |
| 	// Writer interface to use for volumes.
 | |
| 	writer kubeio.Writer
 | |
| 
 | |
| 	// Manager of non-Runtime containers.
 | |
| 	containerManager cm.ContainerManager
 | |
| 	nodeConfig       cm.NodeConfig
 | |
| 
 | |
| 	// Traffic to IPs outside this range will use IP masquerade.
 | |
| 	nonMasqueradeCIDR string
 | |
| 
 | |
| 	// Maximum Number of Pods which can be run by this Kubelet
 | |
| 	maxPods int
 | |
| 
 | |
| 	// Number of NVIDIA GPUs on this node
 | |
| 	nvidiaGPUs int
 | |
| 
 | |
| 	// Monitor Kubelet's sync loop
 | |
| 	syncLoopMonitor atomic.Value
 | |
| 
 | |
| 	// Container restart Backoff
 | |
| 	backOff *flowcontrol.Backoff
 | |
| 
 | |
| 	// Channel for sending pods to kill.
 | |
| 	podKillingCh chan *kubecontainer.PodPair
 | |
| 
 | |
| 	// The configuration file used as the base to generate the container's
 | |
| 	// DNS resolver configuration file. This can be used in conjunction with
 | |
| 	// clusterDomain and clusterDNS.
 | |
| 	resolverConfig string
 | |
| 
 | |
| 	// Optionally shape the bandwidth of a pod
 | |
| 	// TODO: remove when kubenet plugin is ready
 | |
| 	shaper bandwidth.BandwidthShaper
 | |
| 
 | |
| 	// True if container cpu limits should be enforced via cgroup CFS quota
 | |
| 	cpuCFSQuota bool
 | |
| 
 | |
| 	// Information about the ports which are opened by daemons on Node running this Kubelet server.
 | |
| 	daemonEndpoints *v1.NodeDaemonEndpoints
 | |
| 
 | |
| 	// A queue used to trigger pod workers.
 | |
| 	workQueue queue.WorkQueue
 | |
| 
 | |
| 	// oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
 | |
| 	oneTimeInitializer sync.Once
 | |
| 
 | |
| 	// If non-nil, use this IP address for the node
 | |
| 	nodeIP net.IP
 | |
| 
 | |
| 	// clock is an interface that provides time related functionality in a way that makes it
 | |
| 	// easy to test the code.
 | |
| 	clock clock.Clock
 | |
| 
 | |
| 	// outOfDiskTransitionFrequency specifies the amount of time the kubelet has to be actually
 | |
| 	// not out of disk before it can transition the node condition status from out-of-disk to
 | |
| 	// not-out-of-disk. This prevents a pod that causes out-of-disk condition from repeatedly
 | |
| 	// getting rescheduled onto the node.
 | |
| 	outOfDiskTransitionFrequency time.Duration
 | |
| 
 | |
| 	// reservation specifies resources which are reserved for non-pod usage, including kubernetes and
 | |
| 	// non-kubernetes system processes.
 | |
| 	reservation kubetypes.Reservation
 | |
| 
 | |
| 	// support gathering custom metrics.
 | |
| 	enableCustomMetrics bool
 | |
| 
 | |
| 	// How the Kubelet should setup hairpin NAT. Can take the values: "promiscuous-bridge"
 | |
| 	// (make cbr0 promiscuous), "hairpin-veth" (set the hairpin flag on veth interfaces)
 | |
| 	// or "none" (do nothing).
 | |
| 	hairpinMode componentconfig.HairpinMode
 | |
| 
 | |
| 	// The node has babysitter process monitoring docker and kubelet
 | |
| 	babysitDaemons bool
 | |
| 
 | |
| 	// handlers called during the tryUpdateNodeStatus cycle
 | |
| 	setNodeStatusFuncs []func(*v1.Node) error
 | |
| 
 | |
| 	// TODO: think about moving this to be centralized in PodWorkers in follow-on.
 | |
| 	// the list of handlers to call during pod admission.
 | |
| 	admitHandlers lifecycle.PodAdmitHandlers
 | |
| 
 | |
| 	// softAdmithandlers are applied to the pod after it is admitted by the Kubelet, but before it is
 | |
| 	// run. A pod rejected by a softAdmitHandler will be left in a Pending state indefinitely. If a
 | |
| 	// rejected pod should not be recreated, or the scheduler is not aware of the rejection rule, the
 | |
| 	// admission rule should be applied by a softAdmitHandler.
 | |
| 	softAdmitHandlers lifecycle.PodAdmitHandlers
 | |
| 
 | |
| 	// the list of handlers to call during pod sync loop.
 | |
| 	lifecycle.PodSyncLoopHandlers
 | |
| 
 | |
| 	// the list of handlers to call during pod sync.
 | |
| 	lifecycle.PodSyncHandlers
 | |
| 
 | |
| 	// the number of allowed pods per core
 | |
| 	podsPerCore int
 | |
| 
 | |
| 	// enableControllerAttachDetach indicates the Attach/Detach controller
 | |
| 	// should manage attachment/detachment of volumes scheduled to this node,
 | |
| 	// and disable kubelet from executing any attach/detach operations
 | |
| 	enableControllerAttachDetach bool
 | |
| 
 | |
| 	// trigger deleting containers in a pod
 | |
| 	containerDeletor *podContainerDeletor
 | |
| 
 | |
| 	// config iptables util rules
 | |
| 	makeIPTablesUtilChains bool
 | |
| 
 | |
| 	// The bit of the fwmark space to mark packets for SNAT.
 | |
| 	iptablesMasqueradeBit int
 | |
| 
 | |
| 	// The bit of the fwmark space to mark packets for dropping.
 | |
| 	iptablesDropBit int
 | |
| 
 | |
| 	// The AppArmor validator for checking whether AppArmor is supported.
 | |
| 	appArmorValidator apparmor.Validator
 | |
| 
 | |
| 	// The handler serving CRI streaming calls (exec/attach/port-forward).
 | |
| 	criHandler http.Handler
 | |
| 
 | |
| 	// experimentalHostUserNamespaceDefaulting sets userns=true when users request host namespaces (pid, ipc, net),
 | |
| 	// are using non-namespaced capabilities (mknod, sys_time, sys_module), the pod contains a privileged container,
 | |
| 	// or using host path volumes.
 | |
| 	// This should only be enabled when the container runtime is performing user remapping AND if the
 | |
| 	// experimental behavior is desired.
 | |
| 	experimentalHostUserNamespaceDefaulting bool
 | |
| }
 | |
| 
 | |
| // setupDataDirs creates:
 | |
| // 1.  the root directory
 | |
| // 2.  the pods directory
 | |
| // 3.  the plugins directory
 | |
| func (kl *Kubelet) setupDataDirs() error {
 | |
| 	kl.rootDirectory = path.Clean(kl.rootDirectory)
 | |
| 	if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
 | |
| 		return fmt.Errorf("error creating root directory: %v", err)
 | |
| 	}
 | |
| 	if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
 | |
| 		return fmt.Errorf("error creating pods directory: %v", err)
 | |
| 	}
 | |
| 	if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
 | |
| 		return fmt.Errorf("error creating plugins directory: %v", err)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Starts garbage collection threads.
 | |
| func (kl *Kubelet) StartGarbageCollection() {
 | |
| 	loggedContainerGCFailure := false
 | |
| 	go wait.Until(func() {
 | |
| 		if err := kl.containerGC.GarbageCollect(kl.sourcesReady.AllReady()); err != nil {
 | |
| 			glog.Errorf("Container garbage collection failed: %v", err)
 | |
| 			kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
 | |
| 			loggedContainerGCFailure = true
 | |
| 		} else {
 | |
| 			var vLevel glog.Level = 4
 | |
| 			if loggedContainerGCFailure {
 | |
| 				vLevel = 1
 | |
| 				loggedContainerGCFailure = false
 | |
| 			}
 | |
| 
 | |
| 			glog.V(vLevel).Infof("Container garbage collection succeeded")
 | |
| 		}
 | |
| 	}, ContainerGCPeriod, wait.NeverStop)
 | |
| 
 | |
| 	loggedImageGCFailure := false
 | |
| 	go wait.Until(func() {
 | |
| 		if err := kl.imageManager.GarbageCollect(); err != nil {
 | |
| 			glog.Errorf("Image garbage collection failed: %v", err)
 | |
| 			kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
 | |
| 			loggedImageGCFailure = true
 | |
| 		} else {
 | |
| 			var vLevel glog.Level = 4
 | |
| 			if loggedImageGCFailure {
 | |
| 				vLevel = 1
 | |
| 				loggedImageGCFailure = false
 | |
| 			}
 | |
| 
 | |
| 			glog.V(vLevel).Infof("Image garbage collection succeeded")
 | |
| 		}
 | |
| 	}, ImageGCPeriod, wait.NeverStop)
 | |
| }
 | |
| 
 | |
| // initializeModules will initialize internal modules that do not require the container runtime to be up.
 | |
| // Note that the modules here must not depend on modules that are not initialized here.
 | |
| func (kl *Kubelet) initializeModules() error {
 | |
| 	// Step 1: Promethues metrics.
 | |
| 	metrics.Register(kl.runtimeCache)
 | |
| 
 | |
| 	// Step 2: Setup filesystem directories.
 | |
| 	if err := kl.setupDataDirs(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Step 3: If the container logs directory does not exist, create it.
 | |
| 	if _, err := os.Stat(ContainerLogsDir); err != nil {
 | |
| 		if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
 | |
| 			glog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Step 4: Start the image manager.
 | |
| 	kl.imageManager.Start()
 | |
| 
 | |
| 	// Step 5: Start container manager.
 | |
| 	node, err := kl.getNodeAnyWay()
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("Kubelet failed to get node info: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	if err := kl.containerManager.Start(node); err != nil {
 | |
| 		return fmt.Errorf("Failed to start ContainerManager %v", err)
 | |
| 	}
 | |
| 
 | |
| 	// Step 6: Start out of memory watcher.
 | |
| 	if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
 | |
| 		return fmt.Errorf("Failed to start OOM watcher %v", err)
 | |
| 	}
 | |
| 
 | |
| 	// Step 7: Start resource analyzer
 | |
| 	kl.resourceAnalyzer.Start()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
 | |
| func (kl *Kubelet) initializeRuntimeDependentModules() {
 | |
| 	if err := kl.cadvisor.Start(); err != nil {
 | |
| 		// Fail kubelet and rely on the babysitter to retry starting kubelet.
 | |
| 		// TODO(random-liu): Add backoff logic in the babysitter
 | |
| 		glog.Fatalf("Failed to start cAdvisor %v", err)
 | |
| 	}
 | |
| 	// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
 | |
| 	kl.evictionManager.Start(kl, kl.getActivePods, evictionMonitoringPeriod)
 | |
| }
 | |
| 
 | |
| // Run starts the kubelet reacting to config updates
 | |
| func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
 | |
| 	if kl.logServer == nil {
 | |
| 		kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
 | |
| 	}
 | |
| 	if kl.kubeClient == nil {
 | |
| 		glog.Warning("No api server defined - no node status update will be sent.")
 | |
| 	}
 | |
| 	if err := kl.initializeModules(); err != nil {
 | |
| 		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
 | |
| 		glog.Error(err)
 | |
| 		kl.runtimeState.setInitError(err)
 | |
| 	}
 | |
| 
 | |
| 	// Start volume manager
 | |
| 	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
 | |
| 
 | |
| 	if kl.kubeClient != nil {
 | |
| 		// Start syncing node status immediately, this may set up things the runtime needs to run.
 | |
| 		go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
 | |
| 	}
 | |
| 	go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
 | |
| 	go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
 | |
| 
 | |
| 	// Start loop to sync iptables util rules
 | |
| 	if kl.makeIPTablesUtilChains {
 | |
| 		go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
 | |
| 	}
 | |
| 
 | |
| 	// Start a goroutine responsible for killing pods (that are not properly
 | |
| 	// handled by pod workers).
 | |
| 	go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
 | |
| 
 | |
| 	// Start gorouting responsible for checking limits in resolv.conf
 | |
| 	if kl.resolverConfig != "" {
 | |
| 		go wait.Until(func() { kl.checkLimitsForResolvConf() }, 30*time.Second, wait.NeverStop)
 | |
| 	}
 | |
| 
 | |
| 	// Start component sync loops.
 | |
| 	kl.statusManager.Start()
 | |
| 	kl.probeManager.Start()
 | |
| 
 | |
| 	// Start the pod lifecycle event generator.
 | |
| 	kl.pleg.Start()
 | |
| 	kl.syncLoop(updates, kl)
 | |
| }
 | |
| 
 | |
| // GetKubeClient returns the Kubernetes client.
 | |
| // TODO: This is currently only required by network plugins. Replace
 | |
| // with more specific methods.
 | |
| func (kl *Kubelet) GetKubeClient() clientset.Interface {
 | |
| 	return kl.kubeClient
 | |
| }
 | |
| 
 | |
| // GetClusterDNS returns a list of the DNS servers and a list of the DNS search
 | |
| // domains of the cluster.
 | |
| func (kl *Kubelet) GetClusterDNS(pod *v1.Pod) ([]string, []string, error) {
 | |
| 	var hostDNS, hostSearch []string
 | |
| 	// Get host DNS settings
 | |
| 	if kl.resolverConfig != "" {
 | |
| 		f, err := os.Open(kl.resolverConfig)
 | |
| 		if err != nil {
 | |
| 			return nil, nil, err
 | |
| 		}
 | |
| 		defer f.Close()
 | |
| 
 | |
| 		hostDNS, hostSearch, err = kl.parseResolvConf(f)
 | |
| 		if err != nil {
 | |
| 			return nil, nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	useClusterFirstPolicy := pod.Spec.DNSPolicy == v1.DNSClusterFirst
 | |
| 	if useClusterFirstPolicy && kl.clusterDNS == nil {
 | |
| 		// clusterDNS is not known.
 | |
| 		// pod with ClusterDNSFirst Policy cannot be created
 | |
| 		kl.recorder.Eventf(pod, v1.EventTypeWarning, "MissingClusterDNS", "kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy)
 | |
| 		log := fmt.Sprintf("kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. pod: %q. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy, format.Pod(pod))
 | |
| 		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, "MissingClusterDNS", log)
 | |
| 
 | |
| 		// fallback to DNSDefault
 | |
| 		useClusterFirstPolicy = false
 | |
| 	}
 | |
| 
 | |
| 	if !useClusterFirstPolicy {
 | |
| 		// When the kubelet --resolv-conf flag is set to the empty string, use
 | |
| 		// DNS settings that override the docker default (which is to use
 | |
| 		// /etc/resolv.conf) and effectively disable DNS lookups. According to
 | |
| 		// the bind documentation, the behavior of the DNS client library when
 | |
| 		// "nameservers" are not specified is to "use the nameserver on the
 | |
| 		// local machine". A nameserver setting of localhost is equivalent to
 | |
| 		// this documented behavior.
 | |
| 		if kl.resolverConfig == "" {
 | |
| 			hostDNS = []string{"127.0.0.1"}
 | |
| 			hostSearch = []string{"."}
 | |
| 		} else {
 | |
| 			hostSearch = kl.formDNSSearchForDNSDefault(hostSearch, pod)
 | |
| 		}
 | |
| 		return hostDNS, hostSearch, nil
 | |
| 	}
 | |
| 
 | |
| 	// for a pod with DNSClusterFirst policy, the cluster DNS server is the only nameserver configured for
 | |
| 	// the pod. The cluster DNS server itself will forward queries to other nameservers that is configured to use,
 | |
| 	// in case the cluster DNS server cannot resolve the DNS query itself
 | |
| 	dns := []string{kl.clusterDNS.String()}
 | |
| 	dnsSearch := kl.formDNSSearch(hostSearch, pod)
 | |
| 	return dns, dnsSearch, nil
 | |
| }
 | |
| 
 | |
| // syncPod is the transaction script for the sync of a single pod.
 | |
| //
 | |
| // Arguments:
 | |
| //
 | |
| // o - the SyncPodOptions for this invocation
 | |
| //
 | |
| // The workflow is:
 | |
| // * If the pod is being created, record pod worker start latency
 | |
| // * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
 | |
| // * If the pod is being seen as running for the first time, record pod
 | |
| //   start latency
 | |
| // * Update the status of the pod in the status manager
 | |
| // * Kill the pod if it should not be running
 | |
| // * Create a mirror pod if the pod is a static pod, and does not
 | |
| //   already have a mirror pod
 | |
| // * Create the data directories for the pod if they do not exist
 | |
| // * Wait for volumes to attach/mount
 | |
| // * Fetch the pull secrets for the pod
 | |
| // * Call the container runtime's SyncPod callback
 | |
| // * Update the traffic shaping for the pod's ingress and egress limits
 | |
| //
 | |
| // If any step of this workflow errors, the error is returned, and is repeated
 | |
| // on the next syncPod call.
 | |
| func (kl *Kubelet) syncPod(o syncPodOptions) error {
 | |
| 	// pull out the required options
 | |
| 	pod := o.pod
 | |
| 	mirrorPod := o.mirrorPod
 | |
| 	podStatus := o.podStatus
 | |
| 	updateType := o.updateType
 | |
| 
 | |
| 	// if we want to kill a pod, do it now!
 | |
| 	if updateType == kubetypes.SyncPodKill {
 | |
| 		killPodOptions := o.killPodOptions
 | |
| 		if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
 | |
| 			return fmt.Errorf("kill pod options are required if update type is kill")
 | |
| 		}
 | |
| 		apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
 | |
| 		kl.statusManager.SetPodStatus(pod, apiPodStatus)
 | |
| 		// we kill the pod with the specified grace period since this is a termination
 | |
| 		if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
 | |
| 			// there was an error killing the pod, so we return that error directly
 | |
| 			utilruntime.HandleError(err)
 | |
| 			return err
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Latency measurements for the main workflow are relative to the
 | |
| 	// first time the pod was seen by the API server.
 | |
| 	var firstSeenTime time.Time
 | |
| 	if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
 | |
| 		firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
 | |
| 	}
 | |
| 
 | |
| 	// Record pod worker start latency if being created
 | |
| 	// TODO: make pod workers record their own latencies
 | |
| 	if updateType == kubetypes.SyncPodCreate {
 | |
| 		if !firstSeenTime.IsZero() {
 | |
| 			// This is the first time we are syncing the pod. Record the latency
 | |
| 			// since kubelet first saw the pod if firstSeenTime is set.
 | |
| 			metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
 | |
| 		} else {
 | |
| 			glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Generate final API pod status with pod and status manager status
 | |
| 	apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
 | |
| 	// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
 | |
| 	// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
 | |
| 	// set pod IP to hostIP directly in runtime.GetPodStatus
 | |
| 	podStatus.IP = apiPodStatus.PodIP
 | |
| 
 | |
| 	// Record the time it takes for the pod to become running.
 | |
| 	existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
 | |
| 	if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
 | |
| 		!firstSeenTime.IsZero() {
 | |
| 		metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
 | |
| 	}
 | |
| 
 | |
| 	runnable := kl.canRunPod(pod)
 | |
| 	if !runnable.Admit {
 | |
| 		// Pod is not runnable; update the Pod and Container statuses to why.
 | |
| 		apiPodStatus.Reason = runnable.Reason
 | |
| 		apiPodStatus.Message = runnable.Message
 | |
| 		// Waiting containers are not creating.
 | |
| 		const waitingReason = "Blocked"
 | |
| 		for _, cs := range apiPodStatus.InitContainerStatuses {
 | |
| 			if cs.State.Waiting != nil {
 | |
| 				cs.State.Waiting.Reason = waitingReason
 | |
| 			}
 | |
| 		}
 | |
| 		for _, cs := range apiPodStatus.ContainerStatuses {
 | |
| 			if cs.State.Waiting != nil {
 | |
| 				cs.State.Waiting.Reason = waitingReason
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Update status in the status manager
 | |
| 	kl.statusManager.SetPodStatus(pod, apiPodStatus)
 | |
| 
 | |
| 	// Kill pod if it should not be running
 | |
| 	if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
 | |
| 		var syncErr error
 | |
| 		if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
 | |
| 			syncErr = fmt.Errorf("error killing pod: %v", err)
 | |
| 			utilruntime.HandleError(syncErr)
 | |
| 		} else {
 | |
| 			if !runnable.Admit {
 | |
| 				// There was no error killing the pod, but the pod cannot be run.
 | |
| 				// Return an error to signal that the sync loop should back off.
 | |
| 				syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
 | |
| 			}
 | |
| 		}
 | |
| 		return syncErr
 | |
| 	}
 | |
| 
 | |
| 	// If the network plugin is not ready, only start the pod if it uses the host network
 | |
| 	if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {
 | |
| 		return fmt.Errorf("network is not ready: %v", rs)
 | |
| 	}
 | |
| 
 | |
| 	// Create Cgroups for the pod and apply resource parameters
 | |
| 	// to them if cgroup-per-qos flag is enabled.
 | |
| 	pcm := kl.containerManager.NewPodContainerManager()
 | |
| 	// If pod has already been terminated then we need not create
 | |
| 	// or update the pod's cgroup
 | |
| 	if !kl.podIsTerminated(pod) {
 | |
| 		// When the kubelet is restarted with the cgroup-per-qos
 | |
| 		// flag enabled, all the pod's running containers
 | |
| 		// should be killed intermittently and brought back up
 | |
| 		// under the qos cgroup hierarchy.
 | |
| 		// Check if this is the pod's first sync
 | |
| 		firstSync := true
 | |
| 		for _, containerStatus := range apiPodStatus.ContainerStatuses {
 | |
| 			if containerStatus.State.Running != nil {
 | |
| 				firstSync = false
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 		// Don't kill containers in pod if pod's cgroups already
 | |
| 		// exists or the pod is running for the first time
 | |
| 		podKilled := false
 | |
| 		if !pcm.Exists(pod) && !firstSync {
 | |
| 			kl.killPod(pod, nil, podStatus, nil)
 | |
| 			podKilled = true
 | |
| 		}
 | |
| 		// Create and Update pod's Cgroups
 | |
| 		// Don't create cgroups for run once pod if it was killed above
 | |
| 		// The current policy is not to restart the run once pods when
 | |
| 		// the kubelet is restarted with the new flag as run once pods are
 | |
| 		// expected to run only once and if the kubelet is restarted then
 | |
| 		// they are not expected to run again.
 | |
| 		// We don't create and apply updates to cgroup if its a run once pod and was killed above
 | |
| 		if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
 | |
| 			if err := pcm.EnsureExists(pod); err != nil {
 | |
| 				return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Create Mirror Pod for Static Pod if it doesn't already exist
 | |
| 	if kubepod.IsStaticPod(pod) {
 | |
| 		podFullName := kubecontainer.GetPodFullName(pod)
 | |
| 		deleted := false
 | |
| 		if mirrorPod != nil {
 | |
| 			if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
 | |
| 				// The mirror pod is semantically different from the static pod. Remove
 | |
| 				// it. The mirror pod will get recreated later.
 | |
| 				glog.Warningf("Deleting mirror pod %q because it is outdated", format.Pod(mirrorPod))
 | |
| 				if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil {
 | |
| 					glog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
 | |
| 				} else {
 | |
| 					deleted = true
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 		if mirrorPod == nil || deleted {
 | |
| 			glog.V(3).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
 | |
| 			if err := kl.podManager.CreateMirrorPod(pod); err != nil {
 | |
| 				glog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Make data directories for the pod
 | |
| 	if err := kl.makePodDataDirs(pod); err != nil {
 | |
| 		glog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Wait for volumes to attach/mount
 | |
| 	if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
 | |
| 		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
 | |
| 		glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Fetch the pull secrets for the pod
 | |
| 	pullSecrets, err := kl.getPullSecretsForPod(pod)
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Unable to get pull secrets for pod %q: %v", format.Pod(pod), err)
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Call the container runtime's SyncPod callback
 | |
| 	result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
 | |
| 	kl.reasonCache.Update(pod.UID, result)
 | |
| 	if err = result.Error(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// early successful exit if pod is not bandwidth-constrained
 | |
| 	if !kl.shapingEnabled() {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Update the traffic shaping for the pod's ingress and egress limits
 | |
| 	ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if egress != nil || ingress != nil {
 | |
| 		if kubecontainer.IsHostNetworkPod(pod) {
 | |
| 			kl.recorder.Event(pod, v1.EventTypeWarning, events.HostNetworkNotSupported, "Bandwidth shaping is not currently supported on the host network")
 | |
| 		} else if kl.shaper != nil {
 | |
| 			if len(apiPodStatus.PodIP) > 0 {
 | |
| 				err = kl.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", apiPodStatus.PodIP), egress, ingress)
 | |
| 			}
 | |
| 		} else {
 | |
| 			kl.recorder.Event(pod, v1.EventTypeWarning, events.UndefinedShaper, "Pod requests bandwidth shaping, but the shaper is undefined")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
 | |
| //   * pod whose work is ready.
 | |
| //   * internal modules that request sync of a pod.
 | |
| func (kl *Kubelet) getPodsToSync() []*v1.Pod {
 | |
| 	allPods := kl.podManager.GetPods()
 | |
| 	podUIDs := kl.workQueue.GetWork()
 | |
| 	podUIDSet := sets.NewString()
 | |
| 	for _, podUID := range podUIDs {
 | |
| 		podUIDSet.Insert(string(podUID))
 | |
| 	}
 | |
| 	var podsToSync []*v1.Pod
 | |
| 	for _, pod := range allPods {
 | |
| 		if podUIDSet.Has(string(pod.UID)) {
 | |
| 			// The work of the pod is ready
 | |
| 			podsToSync = append(podsToSync, pod)
 | |
| 			continue
 | |
| 		}
 | |
| 		for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
 | |
| 			if podSyncLoopHandler.ShouldSync(pod) {
 | |
| 				podsToSync = append(podsToSync, pod)
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return podsToSync
 | |
| }
 | |
| 
 | |
| // deletePod deletes the pod from the internal state of the kubelet by:
 | |
| // 1.  stopping the associated pod worker asynchronously
 | |
| // 2.  signaling to kill the pod by sending on the podKillingCh channel
 | |
| //
 | |
| // deletePod returns an error if not all sources are ready or the pod is not
 | |
| // found in the runtime cache.
 | |
| func (kl *Kubelet) deletePod(pod *v1.Pod) error {
 | |
| 	if pod == nil {
 | |
| 		return fmt.Errorf("deletePod does not allow nil pod")
 | |
| 	}
 | |
| 	if !kl.sourcesReady.AllReady() {
 | |
| 		// If the sources aren't ready, skip deletion, as we may accidentally delete pods
 | |
| 		// for sources that haven't reported yet.
 | |
| 		return fmt.Errorf("skipping delete because sources aren't ready yet")
 | |
| 	}
 | |
| 	kl.podWorkers.ForgetWorker(pod.UID)
 | |
| 
 | |
| 	// Runtime cache may not have been updated to with the pod, but it's okay
 | |
| 	// because the periodic cleanup routine will attempt to delete again later.
 | |
| 	runningPods, err := kl.runtimeCache.GetPods()
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("error listing containers: %v", err)
 | |
| 	}
 | |
| 	runningPod := kubecontainer.Pods(runningPods).FindPod("", pod.UID)
 | |
| 	if runningPod.IsEmpty() {
 | |
| 		return fmt.Errorf("pod not found")
 | |
| 	}
 | |
| 	podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}
 | |
| 
 | |
| 	kl.podKillingCh <- &podPair
 | |
| 	// TODO: delete the mirror pod here?
 | |
| 
 | |
| 	// We leave the volume/directory cleanup to the periodic cleanup routine.
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // isOutOfDisk detects if pods can't fit due to lack of disk space.
 | |
| func (kl *Kubelet) isOutOfDisk() bool {
 | |
| 	// Check disk space once globally and reject or accept all new pods.
 | |
| 	withinBounds, err := kl.diskSpaceManager.IsRuntimeDiskSpaceAvailable()
 | |
| 	// Assume enough space in case of errors.
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Failed to check if disk space is available for the runtime: %v", err)
 | |
| 	} else if !withinBounds {
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	withinBounds, err = kl.diskSpaceManager.IsRootDiskSpaceAvailable()
 | |
| 	// Assume enough space in case of errors.
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Failed to check if disk space is available on the root partition: %v", err)
 | |
| 	} else if !withinBounds {
 | |
| 		return true
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // rejectPod records an event about the pod with the given reason and message,
 | |
| // and updates the pod to the failed phase in the status manage.
 | |
| func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
 | |
| 	kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message)
 | |
| 	kl.statusManager.SetPodStatus(pod, v1.PodStatus{
 | |
| 		Phase:   v1.PodFailed,
 | |
| 		Reason:  reason,
 | |
| 		Message: "Pod " + message})
 | |
| }
 | |
| 
 | |
| // canAdmitPod determines if a pod can be admitted, and gives a reason if it
 | |
| // cannot. "pod" is new pod, while "pods" are all admitted pods
 | |
| // The function returns a boolean value indicating whether the pod
 | |
| // can be admitted, a brief single-word reason and a message explaining why
 | |
| // the pod cannot be admitted.
 | |
| func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
 | |
| 	// the kubelet will invoke each pod admit handler in sequence
 | |
| 	// if any handler rejects, the pod is rejected.
 | |
| 	// TODO: move out of disk check into a pod admitter
 | |
| 	// TODO: out of resource eviction should have a pod admitter call-out
 | |
| 	attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
 | |
| 	for _, podAdmitHandler := range kl.admitHandlers {
 | |
| 		if result := podAdmitHandler.Admit(attrs); !result.Admit {
 | |
| 			return false, result.Reason, result.Message
 | |
| 		}
 | |
| 	}
 | |
| 	// TODO: When disk space scheduling is implemented (#11976), remove the out-of-disk check here and
 | |
| 	// add the disk space predicate to predicates.GeneralPredicates.
 | |
| 	if kl.isOutOfDisk() {
 | |
| 		glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), "predicate fails due to OutOfDisk")
 | |
| 		return false, "OutOfDisk", "cannot be started due to lack of disk space."
 | |
| 	}
 | |
| 
 | |
| 	return true, "", ""
 | |
| }
 | |
| 
 | |
| func (kl *Kubelet) canRunPod(pod *v1.Pod) lifecycle.PodAdmitResult {
 | |
| 	attrs := &lifecycle.PodAdmitAttributes{Pod: pod}
 | |
| 	// Get "OtherPods". Rejected pods are failed, so only include admitted pods that are alive.
 | |
| 	attrs.OtherPods = kl.filterOutTerminatedPods(kl.podManager.GetPods())
 | |
| 
 | |
| 	for _, handler := range kl.softAdmitHandlers {
 | |
| 		if result := handler.Admit(attrs); !result.Admit {
 | |
| 			return result
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// TODO: Refactor as a soft admit handler.
 | |
| 	if err := canRunPod(pod); err != nil {
 | |
| 		return lifecycle.PodAdmitResult{
 | |
| 			Admit:   false,
 | |
| 			Reason:  "Forbidden",
 | |
| 			Message: err.Error(),
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return lifecycle.PodAdmitResult{Admit: true}
 | |
| }
 | |
| 
 | |
| // syncLoop is the main loop for processing changes. It watches for changes from
 | |
| // three channels (file, apiserver, and http) and creates a union of them. For
 | |
| // any new change seen, will run a sync against desired state and running state. If
 | |
| // no changes are seen to the configuration, will synchronize the last known desired
 | |
| // state every sync-frequency seconds. Never returns.
 | |
| func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
 | |
| 	glog.Info("Starting kubelet main sync loop.")
 | |
| 	// The resyncTicker wakes up kubelet to checks if there are any pod workers
 | |
| 	// that need to be sync'd. A one-second period is sufficient because the
 | |
| 	// sync interval is defaulted to 10s.
 | |
| 	syncTicker := time.NewTicker(time.Second)
 | |
| 	defer syncTicker.Stop()
 | |
| 	housekeepingTicker := time.NewTicker(housekeepingPeriod)
 | |
| 	defer housekeepingTicker.Stop()
 | |
| 	plegCh := kl.pleg.Watch()
 | |
| 	for {
 | |
| 		if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
 | |
| 			glog.Infof("skipping pod synchronization - %v", rs)
 | |
| 			time.Sleep(5 * time.Second)
 | |
| 			continue
 | |
| 		}
 | |
| 		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // syncLoopIteration reads from various channels and dispatches pods to the
 | |
| // given handler.
 | |
| //
 | |
| // Arguments:
 | |
| // 1.  configCh:       a channel to read config events from
 | |
| // 2.  handler:        the SyncHandler to dispatch pods to
 | |
| // 3.  syncCh:         a channel to read periodic sync events from
 | |
| // 4.  houseKeepingCh: a channel to read housekeeping events from
 | |
| // 5.  plegCh:         a channel to read PLEG updates from
 | |
| //
 | |
| // Events are also read from the kubelet liveness manager's update channel.
 | |
| //
 | |
| // The workflow is to read from one of the channels, handle that event, and
 | |
| // update the timestamp in the sync loop monitor.
 | |
| //
 | |
| // Here is an appropriate place to note that despite the syntactical
 | |
| // similarity to the switch statement, the case statements in a select are
 | |
| // evaluated in a pseudorandom order if there are multiple channels ready to
 | |
| // read from when the select is evaluated.  In other words, case statements
 | |
| // are evaluated in random order, and you can not assume that the case
 | |
| // statements evaluate in order if multiple channels have events.
 | |
| //
 | |
| // With that in mind, in truly no particular order, the different channels
 | |
| // are handled as follows:
 | |
| //
 | |
| // * configCh: dispatch the pods for the config change to the appropriate
 | |
| //             handler callback for the event type
 | |
| // * plegCh: update the runtime cache; sync pod
 | |
| // * syncCh: sync all pods waiting for sync
 | |
| // * houseKeepingCh: trigger cleanup of pods
 | |
| // * liveness manager: sync pods that have failed or in which one or more
 | |
| //                     containers have failed liveness checks
 | |
| func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
 | |
| 	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
 | |
| 	kl.syncLoopMonitor.Store(kl.clock.Now())
 | |
| 	select {
 | |
| 	case u, open := <-configCh:
 | |
| 		// Update from a config source; dispatch it to the right handler
 | |
| 		// callback.
 | |
| 		if !open {
 | |
| 			glog.Errorf("Update channel is closed. Exiting the sync loop.")
 | |
| 			return false
 | |
| 		}
 | |
| 
 | |
| 		switch u.Op {
 | |
| 		case kubetypes.ADD:
 | |
| 			glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
 | |
| 			// After restarting, kubelet will get all existing pods through
 | |
| 			// ADD as if they are new pods. These pods will then go through the
 | |
| 			// admission process and *may* be rejected. This can be resolved
 | |
| 			// once we have checkpointing.
 | |
| 			handler.HandlePodAdditions(u.Pods)
 | |
| 		case kubetypes.UPDATE:
 | |
| 			glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletiontimestamps(u.Pods))
 | |
| 			handler.HandlePodUpdates(u.Pods)
 | |
| 		case kubetypes.REMOVE:
 | |
| 			glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
 | |
| 			handler.HandlePodRemoves(u.Pods)
 | |
| 		case kubetypes.RECONCILE:
 | |
| 			glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
 | |
| 			handler.HandlePodReconcile(u.Pods)
 | |
| 		case kubetypes.DELETE:
 | |
| 			glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
 | |
| 			// DELETE is treated as a UPDATE because of graceful deletion.
 | |
| 			handler.HandlePodUpdates(u.Pods)
 | |
| 		case kubetypes.SET:
 | |
| 			// TODO: Do we want to support this?
 | |
| 			glog.Errorf("Kubelet does not support snapshot update")
 | |
| 		}
 | |
| 
 | |
| 		// Mark the source ready after receiving at least one update from the
 | |
| 		// source. Once all the sources are marked ready, various cleanup
 | |
| 		// routines will start reclaiming resources. It is important that this
 | |
| 		// takes place only after kubelet calls the update handler to process
 | |
| 		// the update to ensure the internal pod cache is up-to-date.
 | |
| 		kl.sourcesReady.AddSource(u.Source)
 | |
| 	case e := <-plegCh:
 | |
| 		if isSyncPodWorthy(e) {
 | |
| 			// PLEG event for a pod; sync it.
 | |
| 			if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
 | |
| 				glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
 | |
| 				handler.HandlePodSyncs([]*v1.Pod{pod})
 | |
| 			} else {
 | |
| 				// If the pod no longer exists, ignore the event.
 | |
| 				glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if e.Type == pleg.ContainerDied {
 | |
| 			if containerID, ok := e.Data.(string); ok {
 | |
| 				kl.cleanUpContainersInPod(e.ID, containerID)
 | |
| 			}
 | |
| 		}
 | |
| 	case <-syncCh:
 | |
| 		// Sync pods waiting for sync
 | |
| 		podsToSync := kl.getPodsToSync()
 | |
| 		if len(podsToSync) == 0 {
 | |
| 			break
 | |
| 		}
 | |
| 		glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
 | |
| 		kl.HandlePodSyncs(podsToSync)
 | |
| 	case update := <-kl.livenessManager.Updates():
 | |
| 		if update.Result == proberesults.Failure {
 | |
| 			// The liveness manager detected a failure; sync the pod.
 | |
| 
 | |
| 			// We should not use the pod from livenessManager, because it is never updated after
 | |
| 			// initialization.
 | |
| 			pod, ok := kl.podManager.GetPodByUID(update.PodUID)
 | |
| 			if !ok {
 | |
| 				// If the pod no longer exists, ignore the update.
 | |
| 				glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
 | |
| 				break
 | |
| 			}
 | |
| 			glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
 | |
| 			handler.HandlePodSyncs([]*v1.Pod{pod})
 | |
| 		}
 | |
| 	case <-housekeepingCh:
 | |
| 		if !kl.sourcesReady.AllReady() {
 | |
| 			// If the sources aren't ready or volume manager has not yet synced the states,
 | |
| 			// skip housekeeping, as we may accidentally delete pods from unready sources.
 | |
| 			glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
 | |
| 		} else {
 | |
| 			glog.V(4).Infof("SyncLoop (housekeeping)")
 | |
| 			if err := handler.HandlePodCleanups(); err != nil {
 | |
| 				glog.Errorf("Failed cleaning pods: %v", err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	kl.syncLoopMonitor.Store(kl.clock.Now())
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| // dispatchWork starts the asynchronous sync of the pod in a pod worker.
 | |
| // If the pod is terminated, dispatchWork
 | |
| func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
 | |
| 	if kl.podIsTerminated(pod) {
 | |
| 		if pod.DeletionTimestamp != nil {
 | |
| 			// If the pod is in a terminated state, there is no pod worker to
 | |
| 			// handle the work item. Check if the DeletionTimestamp has been
 | |
| 			// set, and force a status update to trigger a pod deletion request
 | |
| 			// to the apiserver.
 | |
| 			kl.statusManager.TerminatePod(pod)
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 	// Run the sync in an async worker.
 | |
| 	kl.podWorkers.UpdatePod(&UpdatePodOptions{
 | |
| 		Pod:        pod,
 | |
| 		MirrorPod:  mirrorPod,
 | |
| 		UpdateType: syncType,
 | |
| 		OnCompleteFunc: func(err error) {
 | |
| 			if err != nil {
 | |
| 				metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
 | |
| 			}
 | |
| 		},
 | |
| 	})
 | |
| 	// Note the number of containers for new pods.
 | |
| 	if syncType == kubetypes.SyncPodCreate {
 | |
| 		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TODO: handle mirror pods in a separate component (issue #17251)
 | |
| func (kl *Kubelet) handleMirrorPod(mirrorPod *v1.Pod, start time.Time) {
 | |
| 	// Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the
 | |
| 	// corresponding static pod. Send update to the pod worker if the static
 | |
| 	// pod exists.
 | |
| 	if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok {
 | |
| 		kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // HandlePodAdditions is the callback in SyncHandler for pods being added from
 | |
| // a config source.
 | |
| func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
 | |
| 	start := kl.clock.Now()
 | |
| 	sort.Sort(sliceutils.PodsByCreationTime(pods))
 | |
| 	for _, pod := range pods {
 | |
| 		existingPods := kl.podManager.GetPods()
 | |
| 		// Always add the pod to the pod manager. Kubelet relies on the pod
 | |
| 		// manager as the source of truth for the desired state. If a pod does
 | |
| 		// not exist in the pod manager, it means that it has been deleted in
 | |
| 		// the apiserver and no action (other than cleanup) is required.
 | |
| 		kl.podManager.AddPod(pod)
 | |
| 
 | |
| 		if kubepod.IsMirrorPod(pod) {
 | |
| 			kl.handleMirrorPod(pod, start)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if !kl.podIsTerminated(pod) {
 | |
| 			// Only go through the admission process if the pod is not
 | |
| 			// terminated.
 | |
| 
 | |
| 			// We failed pods that we rejected, so activePods include all admitted
 | |
| 			// pods that are alive.
 | |
| 			activePods := kl.filterOutTerminatedPods(existingPods)
 | |
| 
 | |
| 			// Check if we can admit the pod; if not, reject it.
 | |
| 			if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
 | |
| 				kl.rejectPod(pod, reason, message)
 | |
| 				continue
 | |
| 			}
 | |
| 		}
 | |
| 		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
 | |
| 		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
 | |
| 		kl.probeManager.AddPod(pod)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // HandlePodUpdates is the callback in the SyncHandler interface for pods
 | |
| // being updated from a config source.
 | |
| func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
 | |
| 	start := kl.clock.Now()
 | |
| 	for _, pod := range pods {
 | |
| 		kl.podManager.UpdatePod(pod)
 | |
| 		if kubepod.IsMirrorPod(pod) {
 | |
| 			kl.handleMirrorPod(pod, start)
 | |
| 			continue
 | |
| 		}
 | |
| 		// TODO: Evaluate if we need to validate and reject updates.
 | |
| 
 | |
| 		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
 | |
| 		kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // HandlePodRemoves is the callback in the SyncHandler interface for pods
 | |
| // being removed from a config source.
 | |
| func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
 | |
| 	start := kl.clock.Now()
 | |
| 	for _, pod := range pods {
 | |
| 		kl.podManager.DeletePod(pod)
 | |
| 		if kubepod.IsMirrorPod(pod) {
 | |
| 			kl.handleMirrorPod(pod, start)
 | |
| 			continue
 | |
| 		}
 | |
| 		// Deletion is allowed to fail because the periodic cleanup routine
 | |
| 		// will trigger deletion again.
 | |
| 		if err := kl.deletePod(pod); err != nil {
 | |
| 			glog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err)
 | |
| 		}
 | |
| 		kl.probeManager.RemovePod(pod)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // HandlePodReconcile is the callback in the SyncHandler interface for pods
 | |
| // that should be reconciled.
 | |
| func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
 | |
| 	for _, pod := range pods {
 | |
| 		// Update the pod in pod manager, status manager will do periodically reconcile according
 | |
| 		// to the pod manager.
 | |
| 		kl.podManager.UpdatePod(pod)
 | |
| 
 | |
| 		// After an evicted pod is synced, all dead containers in the pod can be removed.
 | |
| 		if eviction.PodIsEvicted(pod.Status) {
 | |
| 			if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
 | |
| 				kl.containerDeletor.deleteContainersInPod("", podStatus, true)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // HandlePodSyncs is the callback in the syncHandler interface for pods
 | |
| // that should be dispatched to pod workers for sync.
 | |
| func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
 | |
| 	start := kl.clock.Now()
 | |
| 	for _, pod := range pods {
 | |
| 		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
 | |
| 		kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // LatestLoopEntryTime returns the last time in the sync loop monitor.
 | |
| func (kl *Kubelet) LatestLoopEntryTime() time.Time {
 | |
| 	val := kl.syncLoopMonitor.Load()
 | |
| 	if val == nil {
 | |
| 		return time.Time{}
 | |
| 	}
 | |
| 	return val.(time.Time)
 | |
| }
 | |
| 
 | |
| // updateRuntimeUp calls the container runtime status callback, initializing
 | |
| // the runtime dependent modules when the container runtime first comes up,
 | |
| // and returns an error if the status check fails.  If the status check is OK,
 | |
| // update the container runtime uptime in the kubelet runtimeState.
 | |
| func (kl *Kubelet) updateRuntimeUp() {
 | |
| 	s, err := kl.containerRuntime.Status()
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Container runtime sanity check failed: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 	// Only check specific conditions when runtime integration type is cri,
 | |
| 	// because the old integration doesn't populate any runtime condition.
 | |
| 	if kl.kubeletConfiguration.EnableCRI {
 | |
| 		if s == nil {
 | |
| 			glog.Errorf("Container runtime status is nil")
 | |
| 			return
 | |
| 		}
 | |
| 		// Periodically log the whole runtime status for debugging.
 | |
| 		// TODO(random-liu): Consider to send node event when optional
 | |
| 		// condition is unmet.
 | |
| 		glog.V(4).Infof("Container runtime status: %v", s)
 | |
| 		networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
 | |
| 		if networkReady == nil || !networkReady.Status {
 | |
| 			glog.Errorf("Container runtime network not ready: %v", networkReady)
 | |
| 			kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady))
 | |
| 		} else {
 | |
| 			// Set nil if the containe runtime network is ready.
 | |
| 			kl.runtimeState.setNetworkState(nil)
 | |
| 		}
 | |
| 		// TODO(random-liu): Add runtime error in runtimeState, and update it
 | |
| 		// when runtime is not ready, so that the information in RuntimeReady
 | |
| 		// condition will be propagated to NodeReady condition.
 | |
| 		runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
 | |
| 		// If RuntimeReady is not set or is false, report an error.
 | |
| 		if runtimeReady == nil || !runtimeReady.Status {
 | |
| 			glog.Errorf("Container runtime not ready: %v", runtimeReady)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 	kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
 | |
| 	kl.runtimeState.setRuntimeSync(kl.clock.Now())
 | |
| }
 | |
| 
 | |
| // updateCloudProviderFromMachineInfo updates the node's provider ID field
 | |
| // from the given cadvisor machine info.
 | |
| func (kl *Kubelet) updateCloudProviderFromMachineInfo(node *v1.Node, info *cadvisorapi.MachineInfo) {
 | |
| 	if info.CloudProvider != cadvisorapi.UnknownProvider &&
 | |
| 		info.CloudProvider != cadvisorapi.Baremetal {
 | |
| 		// The cloud providers from pkg/cloudprovider/providers/* that update ProviderID
 | |
| 		// will use the format of cloudprovider://project/availability_zone/instance_name
 | |
| 		// here we only have the cloudprovider and the instance name so we leave project
 | |
| 		// and availability zone empty for compatibility.
 | |
| 		node.Spec.ProviderID = strings.ToLower(string(info.CloudProvider)) +
 | |
| 			":////" + string(info.InstanceID)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
 | |
| func (kl *Kubelet) GetConfiguration() componentconfig.KubeletConfiguration {
 | |
| 	return kl.kubeletConfiguration
 | |
| }
 | |
| 
 | |
| // BirthCry sends an event that the kubelet has started up.
 | |
| func (kl *Kubelet) BirthCry() {
 | |
| 	// Make an event that kubelet restarted.
 | |
| 	kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
 | |
| }
 | |
| 
 | |
| // StreamingConnectionIdleTimeout returns the timeout for streaming connections to the HTTP server.
 | |
| func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
 | |
| 	return kl.streamingConnectionIdleTimeout
 | |
| }
 | |
| 
 | |
| // ResyncInterval returns the interval used for periodic syncs.
 | |
| func (kl *Kubelet) ResyncInterval() time.Duration {
 | |
| 	return kl.resyncInterval
 | |
| }
 | |
| 
 | |
| // ListenAndServe runs the kubelet HTTP server.
 | |
| func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) {
 | |
| 	server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime, kl.criHandler)
 | |
| }
 | |
| 
 | |
| // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
 | |
| func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
 | |
| 	server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime)
 | |
| }
 | |
| 
 | |
| // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
 | |
| func (kl *Kubelet) cleanUpContainersInPod(podId types.UID, exitedContainerID string) {
 | |
| 	if podStatus, err := kl.podCache.Get(podId); err == nil {
 | |
| 		removeAll := false
 | |
| 		if syncedPod, ok := kl.podManager.GetPodByUID(podId); ok {
 | |
| 			// When an evicted pod has already synced, all containers can be removed.
 | |
| 			removeAll = eviction.PodIsEvicted(syncedPod.Status)
 | |
| 		}
 | |
| 		kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // isSyncPodWorthy filters out events that are not worthy of pod syncing
 | |
| func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
 | |
| 	// ContatnerRemoved doesn't affect pod state
 | |
| 	return event.Type != pleg.ContainerRemoved
 | |
| }
 | |
| 
 | |
| // parseResourceList parses the given configuration map into an API
 | |
| // ResourceList or returns an error.
 | |
| func parseResourceList(m componentconfig.ConfigurationMap) (v1.ResourceList, error) {
 | |
| 	rl := make(v1.ResourceList)
 | |
| 	for k, v := range m {
 | |
| 		switch v1.ResourceName(k) {
 | |
| 		// Only CPU and memory resources are supported.
 | |
| 		case v1.ResourceCPU, v1.ResourceMemory:
 | |
| 			q, err := resource.ParseQuantity(v)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			if q.Sign() == -1 {
 | |
| 				return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
 | |
| 			}
 | |
| 			rl[v1.ResourceName(k)] = q
 | |
| 		default:
 | |
| 			return nil, fmt.Errorf("cannot reserve %q resource", k)
 | |
| 		}
 | |
| 	}
 | |
| 	return rl, nil
 | |
| }
 | |
| 
 | |
| // ParseReservation parses the given kubelet- and system- reservations
 | |
| // configuration maps into an internal Reservation instance or returns an
 | |
| // error.
 | |
| func ParseReservation(kubeReserved, systemReserved componentconfig.ConfigurationMap) (*kubetypes.Reservation, error) {
 | |
| 	reservation := new(kubetypes.Reservation)
 | |
| 	if rl, err := parseResourceList(kubeReserved); err != nil {
 | |
| 		return nil, err
 | |
| 	} else {
 | |
| 		reservation.Kubernetes = rl
 | |
| 	}
 | |
| 	if rl, err := parseResourceList(systemReserved); err != nil {
 | |
| 		return nil, err
 | |
| 	} else {
 | |
| 		reservation.System = rl
 | |
| 	}
 | |
| 	return reservation, nil
 | |
| }
 | |
| 
 | |
| // Gets the streaming server configuration to use with in-process CRI shims.
 | |
| func getStreamingConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps) *streaming.Config {
 | |
| 	config := &streaming.Config{
 | |
| 		// Use a relative redirect (no scheme or host).
 | |
| 		BaseURL: &url.URL{
 | |
| 			Path: "/cri/",
 | |
| 		},
 | |
| 		StreamIdleTimeout:               kubeCfg.StreamingConnectionIdleTimeout.Duration,
 | |
| 		StreamCreationTimeout:           streaming.DefaultConfig.StreamCreationTimeout,
 | |
| 		SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
 | |
| 		SupportedPortForwardProtocols:   streaming.DefaultConfig.SupportedPortForwardProtocols,
 | |
| 	}
 | |
| 	if kubeDeps.TLSOptions != nil {
 | |
| 		config.TLSConfig = kubeDeps.TLSOptions.Config
 | |
| 	}
 | |
| 	return config
 | |
| }
 |