mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Add daemon manager
This commit is contained in:
		
				
					committed by
					
						
						Mike Danese
					
				
			
			
				
	
			
			
			
						parent
						
							d59f742914
						
					
				
				
					commit
					c7628fae43
				
			@@ -35,6 +35,7 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
 | 
						"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
 | 
				
			||||||
	clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
 | 
						clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/cloudprovider"
 | 
						"k8s.io/kubernetes/pkg/cloudprovider"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/controller/daemon"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/controller/endpoint"
 | 
						"k8s.io/kubernetes/pkg/controller/endpoint"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/controller/namespace"
 | 
						"k8s.io/kubernetes/pkg/controller/namespace"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/controller/node"
 | 
						"k8s.io/kubernetes/pkg/controller/node"
 | 
				
			||||||
@@ -63,6 +64,7 @@ type CMServer struct {
 | 
				
			|||||||
	CloudConfigFile                   string
 | 
						CloudConfigFile                   string
 | 
				
			||||||
	ConcurrentEndpointSyncs           int
 | 
						ConcurrentEndpointSyncs           int
 | 
				
			||||||
	ConcurrentRCSyncs                 int
 | 
						ConcurrentRCSyncs                 int
 | 
				
			||||||
 | 
						ConcurrentDCSyncs                 int
 | 
				
			||||||
	ServiceSyncPeriod                 time.Duration
 | 
						ServiceSyncPeriod                 time.Duration
 | 
				
			||||||
	NodeSyncPeriod                    time.Duration
 | 
						NodeSyncPeriod                    time.Duration
 | 
				
			||||||
	ResourceQuotaSyncPeriod           time.Duration
 | 
						ResourceQuotaSyncPeriod           time.Duration
 | 
				
			||||||
@@ -98,6 +100,7 @@ func NewCMServer() *CMServer {
 | 
				
			|||||||
		Address:                           net.ParseIP("127.0.0.1"),
 | 
							Address:                           net.ParseIP("127.0.0.1"),
 | 
				
			||||||
		ConcurrentEndpointSyncs:           5,
 | 
							ConcurrentEndpointSyncs:           5,
 | 
				
			||||||
		ConcurrentRCSyncs:                 5,
 | 
							ConcurrentRCSyncs:                 5,
 | 
				
			||||||
 | 
							ConcurrentDCSyncs:                 2,
 | 
				
			||||||
		ServiceSyncPeriod:                 5 * time.Minute,
 | 
							ServiceSyncPeriod:                 5 * time.Minute,
 | 
				
			||||||
		NodeSyncPeriod:                    10 * time.Second,
 | 
							NodeSyncPeriod:                    10 * time.Second,
 | 
				
			||||||
		ResourceQuotaSyncPeriod:           10 * time.Second,
 | 
							ResourceQuotaSyncPeriod:           10 * time.Second,
 | 
				
			||||||
@@ -213,6 +216,9 @@ func (s *CMServer) Run(_ []string) error {
 | 
				
			|||||||
	controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient, replicationControllerPkg.BurstReplicas)
 | 
						controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient, replicationControllerPkg.BurstReplicas)
 | 
				
			||||||
	go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop)
 | 
						go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						daemonManager := daemon.NewDaemonManager(kubeClient)
 | 
				
			||||||
 | 
						go daemonManager.Run(s.ConcurrentDCSyncs, util.NeverStop)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
 | 
						cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		glog.Fatalf("Cloud provider could not be initialized: %v", err)
 | 
							glog.Fatalf("Cloud provider could not be initialized: %v", err)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -20,6 +20,8 @@ import (
 | 
				
			|||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"sync/atomic"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/golang/glog"
 | 
						"github.com/golang/glog"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api"
 | 
						"k8s.io/kubernetes/pkg/api"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/latest"
 | 
						"k8s.io/kubernetes/pkg/api/latest"
 | 
				
			||||||
@@ -28,9 +30,9 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/pkg/client/record"
 | 
						"k8s.io/kubernetes/pkg/client/record"
 | 
				
			||||||
	client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
						client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/controller/framework"
 | 
						"k8s.io/kubernetes/pkg/controller/framework"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/expapi"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/labels"
 | 
						"k8s.io/kubernetes/pkg/labels"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/runtime"
 | 
						"k8s.io/kubernetes/pkg/runtime"
 | 
				
			||||||
	"sync/atomic"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
@@ -213,6 +215,8 @@ func NewControllerExpectations() *ControllerExpectations {
 | 
				
			|||||||
type PodControlInterface interface {
 | 
					type PodControlInterface interface {
 | 
				
			||||||
	// CreateReplica creates new replicated pods according to the spec.
 | 
						// CreateReplica creates new replicated pods according to the spec.
 | 
				
			||||||
	CreateReplica(namespace string, controller *api.ReplicationController) error
 | 
						CreateReplica(namespace string, controller *api.ReplicationController) error
 | 
				
			||||||
 | 
						// CreateReplicaOnNodes creates a new pod according to the spec, on a specified list of nodes.
 | 
				
			||||||
 | 
						CreateReplicaOnNode(namespace string, controller *expapi.DaemonSet, nodeNames string) error
 | 
				
			||||||
	// DeletePod deletes the pod identified by podID.
 | 
						// DeletePod deletes the pod identified by podID.
 | 
				
			||||||
	DeletePod(namespace string, podID string) error
 | 
						DeletePod(namespace string, podID string) error
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -290,6 +294,39 @@ func (r RealPodControl) CreateReplica(namespace string, controller *api.Replicat
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r RealPodControl) CreateReplicaOnNode(namespace string, controller *expapi.DaemonSet, nodeName string) error {
 | 
				
			||||||
 | 
						desiredLabels := getReplicaLabelSet(controller.Spec.Template)
 | 
				
			||||||
 | 
						desiredAnnotations, err := getReplicaAnnotationSet(controller.Spec.Template, controller)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						prefix := getReplicaPrefix(controller.Name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pod := &api.Pod{
 | 
				
			||||||
 | 
							ObjectMeta: api.ObjectMeta{
 | 
				
			||||||
 | 
								Labels:       desiredLabels,
 | 
				
			||||||
 | 
								Annotations:  desiredAnnotations,
 | 
				
			||||||
 | 
								GenerateName: prefix,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if err := api.Scheme.Convert(&controller.Spec.Template.Spec, &pod.Spec); err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("unable to convert pod template: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if labels.Set(pod.Labels).AsSelector().Empty() {
 | 
				
			||||||
 | 
							return fmt.Errorf("unable to create pod replica, no labels")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						pod.Spec.NodeName = nodeName
 | 
				
			||||||
 | 
						if newPod, err := r.KubeClient.Pods(namespace).Create(pod); err != nil {
 | 
				
			||||||
 | 
							r.Recorder.Eventf(controller, "failedCreate", "Error creating: %v", err)
 | 
				
			||||||
 | 
							return fmt.Errorf("unable to create pod replica: %v", err)
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							glog.V(4).Infof("Controller %v created pod %v", controller.Name, newPod.Name)
 | 
				
			||||||
 | 
							r.Recorder.Eventf(controller, "successfulCreate", "Created pod: %v", newPod.Name)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r RealPodControl) DeletePod(namespace, podID string) error {
 | 
					func (r RealPodControl) DeletePod(namespace, podID string) error {
 | 
				
			||||||
	return r.KubeClient.Pods(namespace).Delete(podID, nil)
 | 
						return r.KubeClient.Pods(namespace).Delete(podID, nil)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										19
									
								
								pkg/controller/daemon/doc.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										19
									
								
								pkg/controller/daemon/doc.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,19 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2015 The Kubernetes Authors All rights reserved.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Package daemon contains logic for watching and synchronizing
 | 
				
			||||||
 | 
					// daemons.
 | 
				
			||||||
 | 
					package daemon
 | 
				
			||||||
							
								
								
									
										475
									
								
								pkg/controller/daemon/manager.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										475
									
								
								pkg/controller/daemon/manager.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,475 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2015 The Kubernetes Authors All rights reserved.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package daemon
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"reflect"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/golang/glog"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/api"
 | 
				
			||||||
 | 
						client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/client/unversioned/cache"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/client/unversioned/record"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/controller"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/controller/framework"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/expapi"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/fields"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/labels"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/runtime"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/util"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/util/workqueue"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/watch"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						// Daemons will periodically check that their daemon pods are running as expected.
 | 
				
			||||||
 | 
						FullDaemonResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable.
 | 
				
			||||||
 | 
						// Nodes don't need relisting.
 | 
				
			||||||
 | 
						FullNodeResyncPeriod = 0
 | 
				
			||||||
 | 
						// Daemon pods don't need relisting.
 | 
				
			||||||
 | 
						FullDaemonPodResyncPeriod = 0
 | 
				
			||||||
 | 
						// If sending a status upate to API server fails, we retry a finite number of times.
 | 
				
			||||||
 | 
						StatusUpdateRetries = 1
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type DaemonManager struct {
 | 
				
			||||||
 | 
						kubeClient client.Interface
 | 
				
			||||||
 | 
						podControl controller.PodControlInterface
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// To allow injection of syncDaemon for testing.
 | 
				
			||||||
 | 
						syncHandler func(dcKey string) error
 | 
				
			||||||
 | 
						// A TTLCache of pod creates/deletes each dc expects to see
 | 
				
			||||||
 | 
						expectations controller.ControllerExpectationsInterface
 | 
				
			||||||
 | 
						// A store of daemons, populated by the podController.
 | 
				
			||||||
 | 
						dcStore cache.StoreToDaemonSetLister
 | 
				
			||||||
 | 
						// A store of pods, populated by the podController
 | 
				
			||||||
 | 
						podStore cache.StoreToPodLister
 | 
				
			||||||
 | 
						// A store of pods, populated by the podController
 | 
				
			||||||
 | 
						nodeStore cache.StoreToNodeLister
 | 
				
			||||||
 | 
						// Watches changes to all pods.
 | 
				
			||||||
 | 
						dcController *framework.Controller
 | 
				
			||||||
 | 
						// Watches changes to all pods
 | 
				
			||||||
 | 
						podController *framework.Controller
 | 
				
			||||||
 | 
						// Watches changes to all nodes.
 | 
				
			||||||
 | 
						nodeController *framework.Controller
 | 
				
			||||||
 | 
						// Controllers that need to be updated.
 | 
				
			||||||
 | 
						queue *workqueue.Type
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewDaemonManager(kubeClient client.Interface) *DaemonManager {
 | 
				
			||||||
 | 
						eventBroadcaster := record.NewBroadcaster()
 | 
				
			||||||
 | 
						eventBroadcaster.StartLogging(glog.Infof)
 | 
				
			||||||
 | 
						eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						dm := &DaemonManager{
 | 
				
			||||||
 | 
							kubeClient: kubeClient,
 | 
				
			||||||
 | 
							podControl: controller.RealPodControl{
 | 
				
			||||||
 | 
								KubeClient: kubeClient,
 | 
				
			||||||
 | 
								Recorder:   eventBroadcaster.NewRecorder(api.EventSource{Component: "daemon"}),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							expectations: controller.NewControllerExpectations(),
 | 
				
			||||||
 | 
							queue:        workqueue.New(),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Manage addition/update of daemon controllers.
 | 
				
			||||||
 | 
						dm.dcStore.Store, dm.dcController = framework.NewInformer(
 | 
				
			||||||
 | 
							&cache.ListWatch{
 | 
				
			||||||
 | 
								ListFunc: func() (runtime.Object, error) {
 | 
				
			||||||
 | 
									return dm.kubeClient.Experimental().Daemons(api.NamespaceAll).List(labels.Everything())
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								WatchFunc: func(rv string) (watch.Interface, error) {
 | 
				
			||||||
 | 
									return dm.kubeClient.Experimental().Daemons(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							&expapi.DaemonSet{},
 | 
				
			||||||
 | 
							FullDaemonResyncPeriod,
 | 
				
			||||||
 | 
							framework.ResourceEventHandlerFuncs{
 | 
				
			||||||
 | 
								AddFunc: func(obj interface{}) {
 | 
				
			||||||
 | 
									daemon := obj.(*expapi.DaemonSet)
 | 
				
			||||||
 | 
									glog.V(4).Infof("Adding daemon %s", daemon.Name)
 | 
				
			||||||
 | 
									dm.enqueueController(obj)
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								UpdateFunc: func(old, cur interface{}) {
 | 
				
			||||||
 | 
									oldDaemon := old.(*expapi.DaemonSet)
 | 
				
			||||||
 | 
									glog.V(4).Infof("Updating daemon %s", oldDaemon.Name)
 | 
				
			||||||
 | 
									dm.enqueueController(cur)
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								DeleteFunc: func(obj interface{}) {
 | 
				
			||||||
 | 
									daemon := obj.(*expapi.DaemonSet)
 | 
				
			||||||
 | 
									glog.V(4).Infof("Deleting daemon %s", daemon.Name)
 | 
				
			||||||
 | 
									dm.enqueueController(obj)
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon controller to create/delete
 | 
				
			||||||
 | 
						// more pods until all the effects (expectations) of a daemon controller's create/delete have been observed.
 | 
				
			||||||
 | 
						dm.podStore.Store, dm.podController = framework.NewInformer(
 | 
				
			||||||
 | 
							&cache.ListWatch{
 | 
				
			||||||
 | 
								ListFunc: func() (runtime.Object, error) {
 | 
				
			||||||
 | 
									return dm.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								WatchFunc: func(rv string) (watch.Interface, error) {
 | 
				
			||||||
 | 
									return dm.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							&api.Pod{},
 | 
				
			||||||
 | 
							FullDaemonPodResyncPeriod,
 | 
				
			||||||
 | 
							framework.ResourceEventHandlerFuncs{
 | 
				
			||||||
 | 
								AddFunc:    dm.addPod,
 | 
				
			||||||
 | 
								UpdateFunc: dm.updatePod,
 | 
				
			||||||
 | 
								DeleteFunc: dm.deletePod,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						// Watch for new nodes or updates to nodes - daemons are launched on new nodes, and possibly when labels on nodes change,
 | 
				
			||||||
 | 
						dm.nodeStore.Store, dm.nodeController = framework.NewInformer(
 | 
				
			||||||
 | 
							&cache.ListWatch{
 | 
				
			||||||
 | 
								ListFunc: func() (runtime.Object, error) {
 | 
				
			||||||
 | 
									return dm.kubeClient.Nodes().List(labels.Everything(), fields.Everything())
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								WatchFunc: func(rv string) (watch.Interface, error) {
 | 
				
			||||||
 | 
									return dm.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), rv)
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							&api.Node{},
 | 
				
			||||||
 | 
							FullNodeResyncPeriod,
 | 
				
			||||||
 | 
							framework.ResourceEventHandlerFuncs{
 | 
				
			||||||
 | 
								AddFunc:    dm.addNode,
 | 
				
			||||||
 | 
								UpdateFunc: dm.updateNode,
 | 
				
			||||||
 | 
								DeleteFunc: func(node interface{}) {},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						dm.syncHandler = dm.syncDaemon
 | 
				
			||||||
 | 
						return dm
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Run begins watching and syncing daemons.
 | 
				
			||||||
 | 
					func (dm *DaemonManager) Run(workers int, stopCh <-chan struct{}) {
 | 
				
			||||||
 | 
						go dm.dcController.Run(stopCh)
 | 
				
			||||||
 | 
						go dm.podController.Run(stopCh)
 | 
				
			||||||
 | 
						go dm.nodeController.Run(stopCh)
 | 
				
			||||||
 | 
						for i := 0; i < workers; i++ {
 | 
				
			||||||
 | 
							go util.Until(dm.worker, time.Second, stopCh)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						<-stopCh
 | 
				
			||||||
 | 
						glog.Infof("Shutting down Daemon Controller Manager")
 | 
				
			||||||
 | 
						dm.queue.ShutDown()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (dm *DaemonManager) worker() {
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							func() {
 | 
				
			||||||
 | 
								key, quit := dm.queue.Get()
 | 
				
			||||||
 | 
								if quit {
 | 
				
			||||||
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								defer dm.queue.Done(key)
 | 
				
			||||||
 | 
								err := dm.syncHandler(key.(string))
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									glog.Errorf("Error syncing daemon controller with key %s: %v", key.(string), err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (dm *DaemonManager) enqueueAllDaemons() {
 | 
				
			||||||
 | 
						glog.V(4).Infof("Enqueueing all daemons")
 | 
				
			||||||
 | 
						daemons, err := dm.dcStore.List()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							glog.Errorf("Error enqueueing daemon controllers: %v", err)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for i := range daemons {
 | 
				
			||||||
 | 
							dm.enqueueController(&daemons[i])
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (dm *DaemonManager) enqueueController(obj interface{}) {
 | 
				
			||||||
 | 
						key, err := controller.KeyFunc(obj)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						dm.queue.Add(key)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (dm *DaemonManager) getPodDaemon(pod *api.Pod) *expapi.DaemonSet {
 | 
				
			||||||
 | 
						controllers, err := dm.dcStore.GetPodDaemonSets(pod)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							glog.V(4).Infof("No controllers found for pod %v, daemon manager will avoid syncing", pod.Name)
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return &controllers[0]
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (dm *DaemonManager) addPod(obj interface{}) {
 | 
				
			||||||
 | 
						pod := obj.(*api.Pod)
 | 
				
			||||||
 | 
						glog.V(4).Infof("Pod %s added.", pod.Name)
 | 
				
			||||||
 | 
						if dc := dm.getPodDaemon(pod); dc != nil {
 | 
				
			||||||
 | 
							dcKey, err := controller.KeyFunc(dc)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								glog.Errorf("Couldn't get key for object %+v: %v", dc, err)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							dm.expectations.CreationObserved(dcKey)
 | 
				
			||||||
 | 
							dm.enqueueController(dc)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// When a pod is updated, figure out what controller/s manage it and wake them
 | 
				
			||||||
 | 
					// up. If the labels of the pod have changed we need to awaken both the old
 | 
				
			||||||
 | 
					// and new controller. old and cur must be *api.Pod types.
 | 
				
			||||||
 | 
					func (dm *DaemonManager) updatePod(old, cur interface{}) {
 | 
				
			||||||
 | 
						if api.Semantic.DeepEqual(old, cur) {
 | 
				
			||||||
 | 
							// A periodic relist will send update events for all known pods.
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						curPod := cur.(*api.Pod)
 | 
				
			||||||
 | 
						glog.V(4).Infof("Pod %s updated.", curPod.Name)
 | 
				
			||||||
 | 
						if dc := dm.getPodDaemon(curPod); dc != nil {
 | 
				
			||||||
 | 
							dm.enqueueController(dc)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						oldPod := old.(*api.Pod)
 | 
				
			||||||
 | 
						// If the labels have not changed, then the daemon controller responsible for
 | 
				
			||||||
 | 
						// the pod is the same as it was before. In that case we have enqueued the daemon
 | 
				
			||||||
 | 
						// controller above, and do not have to enqueue the controller again.
 | 
				
			||||||
 | 
						if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
 | 
				
			||||||
 | 
							// If the old and new dc are the same, the first one that syncs
 | 
				
			||||||
 | 
							// will set expectations preventing any damage from the second.
 | 
				
			||||||
 | 
							if oldRC := dm.getPodDaemon(oldPod); oldRC != nil {
 | 
				
			||||||
 | 
								dm.enqueueController(oldRC)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (dm *DaemonManager) deletePod(obj interface{}) {
 | 
				
			||||||
 | 
						pod, ok := obj.(*api.Pod)
 | 
				
			||||||
 | 
						glog.V(4).Infof("Pod %s deleted.", pod.Name)
 | 
				
			||||||
 | 
						// When a delete is dropped, the relist will notice a pod in the store not
 | 
				
			||||||
 | 
						// in the list, leading to the insertion of a tombstone object which contains
 | 
				
			||||||
 | 
						// the deleted key/value. Note that this value might be stale. If the pod
 | 
				
			||||||
 | 
						// changed labels the new rc will not be woken up till the periodic resync.
 | 
				
			||||||
 | 
						if !ok {
 | 
				
			||||||
 | 
							tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | 
				
			||||||
 | 
							if !ok {
 | 
				
			||||||
 | 
								glog.Errorf("Couldn't get object from tombstone %+v", obj)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							pod, ok = tombstone.Obj.(*api.Pod)
 | 
				
			||||||
 | 
							if !ok {
 | 
				
			||||||
 | 
								glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if dc := dm.getPodDaemon(pod); dc != nil {
 | 
				
			||||||
 | 
							dcKey, err := controller.KeyFunc(dc)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								glog.Errorf("Couldn't get key for object %+v: %v", dc, err)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							dm.expectations.DeletionObserved(dcKey)
 | 
				
			||||||
 | 
							dm.enqueueController(dc)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (dm *DaemonManager) addNode(obj interface{}) {
 | 
				
			||||||
 | 
						// TODO: it'd be nice to pass a hint with these enqueues, so that each dc would only examine the added node (unless it has other work to do, too).
 | 
				
			||||||
 | 
						dm.enqueueAllDaemons()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (dm *DaemonManager) updateNode(old, cur interface{}) {
 | 
				
			||||||
 | 
						oldNode := old.(*api.Node)
 | 
				
			||||||
 | 
						curNode := cur.(*api.Node)
 | 
				
			||||||
 | 
						if api.Semantic.DeepEqual(oldNode.Name, curNode.Name) && api.Semantic.DeepEqual(oldNode.Namespace, curNode.Namespace) && api.Semantic.DeepEqual(oldNode.Labels, curNode.Labels) {
 | 
				
			||||||
 | 
							// A periodic relist will send update events for all known pods.
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// TODO: it'd be nice to pass a hint with these enqueues, so that each dc would only examine the added node (unless it has other work to do, too).
 | 
				
			||||||
 | 
						dm.enqueueAllDaemons()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to dc) running on the nodes.
 | 
				
			||||||
 | 
					func (dm *DaemonManager) getNodesToDaemonPods(dc *expapi.DaemonSet) (map[string][]*api.Pod, error) {
 | 
				
			||||||
 | 
						nodeToDaemonPods := make(map[string][]*api.Pod)
 | 
				
			||||||
 | 
						daemonPods, err := dm.podStore.Pods(dc.Namespace).List(labels.Set(dc.Spec.Selector).AsSelector())
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nodeToDaemonPods, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for i := range daemonPods.Items {
 | 
				
			||||||
 | 
							nodeName := daemonPods.Items[i].Spec.NodeName
 | 
				
			||||||
 | 
							nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], &daemonPods.Items[i])
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nodeToDaemonPods, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (dm *DaemonManager) manageDaemons(dc *expapi.DaemonSet) {
 | 
				
			||||||
 | 
						// Find out which nodes are running the daemon pods selected by dc.
 | 
				
			||||||
 | 
						nodeToDaemonPods, err := dm.getNodesToDaemonPods(dc)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							glog.Errorf("Error getting node to daemon pod mapping for daemon controller %+v: %v", dc, err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
 | 
				
			||||||
 | 
						// pod. If the node is supposed to run the daemon, but isn't, create the daemon on the node.
 | 
				
			||||||
 | 
						nodeList, err := dm.nodeStore.List()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							glog.Errorf("Couldn't get list of nodes when adding daemon controller %+v: %v", dc, err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						var nodesNeedingDaemons, podsToDelete []string
 | 
				
			||||||
 | 
						for i := range nodeList.Items {
 | 
				
			||||||
 | 
							// Check if the node satisfies the daemon's node selector.
 | 
				
			||||||
 | 
							nodeSelector := labels.Set(dc.Spec.Template.Spec.NodeSelector).AsSelector()
 | 
				
			||||||
 | 
							shouldRun := nodeSelector.Matches(labels.Set(nodeList.Items[i].Labels))
 | 
				
			||||||
 | 
							// If the daemon specifies a node name, check that it matches with nodeName.
 | 
				
			||||||
 | 
							nodeName := nodeList.Items[i].Name
 | 
				
			||||||
 | 
							shouldRun = shouldRun && (dc.Spec.Template.Spec.NodeName == "" || dc.Spec.Template.Spec.NodeName == nodeName)
 | 
				
			||||||
 | 
							daemonPods, isRunning := nodeToDaemonPods[nodeName]
 | 
				
			||||||
 | 
							if shouldRun && !isRunning {
 | 
				
			||||||
 | 
								// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
 | 
				
			||||||
 | 
								nodesNeedingDaemons = append(nodesNeedingDaemons, nodeName)
 | 
				
			||||||
 | 
							} else if shouldRun && len(daemonPods) > 1 {
 | 
				
			||||||
 | 
								// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
 | 
				
			||||||
 | 
								// TODO: sort the daemon pods by creation time, so the the oldest is preserved.
 | 
				
			||||||
 | 
								for i := 1; i < len(daemonPods); i++ {
 | 
				
			||||||
 | 
									podsToDelete = append(podsToDelete, daemonPods[i].Name)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							} else if !shouldRun && isRunning {
 | 
				
			||||||
 | 
								// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
 | 
				
			||||||
 | 
								for i := range daemonPods {
 | 
				
			||||||
 | 
									podsToDelete = append(podsToDelete, daemonPods[i].Name)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// We need to set expectations before creating/deleting pods to avoid race conditions.
 | 
				
			||||||
 | 
						dcKey, err := controller.KeyFunc(dc)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							glog.Errorf("Couldn't get key for object %+v: %v", dc, err)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						dm.expectations.SetExpectations(dcKey, len(nodesNeedingDaemons), len(podsToDelete))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						glog.V(4).Infof("Nodes needing daemons for daemon %s: %+v", dc.Name, nodesNeedingDaemons)
 | 
				
			||||||
 | 
						for i := range nodesNeedingDaemons {
 | 
				
			||||||
 | 
							if err := dm.podControl.CreateReplicaOnNode(dc.Namespace, dc, nodesNeedingDaemons[i]); err != nil {
 | 
				
			||||||
 | 
								glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", dc.Namespace, dc.Name)
 | 
				
			||||||
 | 
								dm.expectations.CreationObserved(dcKey)
 | 
				
			||||||
 | 
								util.HandleError(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						glog.V(4).Infof("Pods to delete for daemon %s: %+v", dc.Name, podsToDelete)
 | 
				
			||||||
 | 
						for i := range podsToDelete {
 | 
				
			||||||
 | 
							if err := dm.podControl.DeletePod(dc.Namespace, podsToDelete[i]); err != nil {
 | 
				
			||||||
 | 
								glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q/%q", dc.Namespace, dc.Name)
 | 
				
			||||||
 | 
								dm.expectations.DeletionObserved(dcKey)
 | 
				
			||||||
 | 
								util.HandleError(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func storeDaemonStatus(dcClient client.DaemonSetInterface, dc *expapi.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error {
 | 
				
			||||||
 | 
						if dc.Status.DesiredNumberScheduled == desiredNumberScheduled && dc.Status.CurrentNumberScheduled == currentNumberScheduled && dc.Status.NumberMisscheduled == numberMisscheduled {
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var updateErr, getErr error
 | 
				
			||||||
 | 
						for i := 0; i <= StatusUpdateRetries; i++ {
 | 
				
			||||||
 | 
							dc.Status.DesiredNumberScheduled = desiredNumberScheduled
 | 
				
			||||||
 | 
							dc.Status.CurrentNumberScheduled = currentNumberScheduled
 | 
				
			||||||
 | 
							dc.Status.NumberMisscheduled = numberMisscheduled
 | 
				
			||||||
 | 
							_, updateErr := dcClient.Update(dc)
 | 
				
			||||||
 | 
							if updateErr == nil {
 | 
				
			||||||
 | 
								return updateErr
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							// Update the controller with the latest resource version for the next poll
 | 
				
			||||||
 | 
							if dc, getErr = dcClient.Get(dc.Name); getErr != nil {
 | 
				
			||||||
 | 
								// If the GET fails we can't trust status.Replicas anymore. This error
 | 
				
			||||||
 | 
								// is bound to be more interesting than the update failure.
 | 
				
			||||||
 | 
								return getErr
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Failed 2 updates one of which was with the latest controller, return the update error
 | 
				
			||||||
 | 
						return updateErr
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (dm *DaemonManager) updateDaemonStatus(dc *expapi.DaemonSet) {
 | 
				
			||||||
 | 
						glog.Infof("Updating daemon status")
 | 
				
			||||||
 | 
						nodeToDaemonPods, err := dm.getNodesToDaemonPods(dc)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							glog.Errorf("Error getting node to daemon pod mapping for daemon %+v: %v", dc, err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						nodeList, err := dm.nodeStore.List()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							glog.Errorf("Couldn't get list of nodes when adding daemon %+v: %v", dc, err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int
 | 
				
			||||||
 | 
						for i := range nodeList.Items {
 | 
				
			||||||
 | 
							nodeSelector := labels.Set(dc.Spec.Template.Spec.NodeSelector).AsSelector()
 | 
				
			||||||
 | 
							shouldRun := nodeSelector.Matches(labels.Set(nodeList.Items[i].Labels))
 | 
				
			||||||
 | 
							numDaemonPods := len(nodeToDaemonPods[nodeList.Items[i].Name])
 | 
				
			||||||
 | 
							if shouldRun {
 | 
				
			||||||
 | 
								desiredNumberScheduled++
 | 
				
			||||||
 | 
								if numDaemonPods == 1 {
 | 
				
			||||||
 | 
									currentNumberScheduled++
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							} else if numDaemonPods >= 1 {
 | 
				
			||||||
 | 
								numberMisscheduled++
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						err = storeDaemonStatus(dm.kubeClient.Experimental().Daemons(dc.Namespace), dc, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							glog.Errorf("Error storing status for daemon %+v: %v", dc, err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (dm *DaemonManager) syncDaemon(key string) error {
 | 
				
			||||||
 | 
						startTime := time.Now()
 | 
				
			||||||
 | 
						defer func() {
 | 
				
			||||||
 | 
							glog.V(4).Infof("Finished syncing daemon %q (%v)", key, time.Now().Sub(startTime))
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
						obj, exists, err := dm.dcStore.Store.GetByKey(key)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							glog.Infof("Unable to retrieve dc %v from store: %v", key, err)
 | 
				
			||||||
 | 
							dm.queue.Add(key)
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if !exists {
 | 
				
			||||||
 | 
							glog.V(3).Infof("Daemon Controller has been deleted %v", key)
 | 
				
			||||||
 | 
							dm.expectations.DeleteExpectations(key)
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						dc := obj.(*expapi.DaemonSet)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Don't process a daemon until all its creations and deletions have been processed.
 | 
				
			||||||
 | 
						// For example if daemon foo asked for 3 new daemon pods in the previous call to manageDaemons,
 | 
				
			||||||
 | 
						// then we do not want to call manageDaemons on foo until the daemon pods have been created.
 | 
				
			||||||
 | 
						dcKey, err := controller.KeyFunc(dc)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							glog.Errorf("Couldn't get key for object %+v: %v", dc, err)
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						dcNeedsSync := dm.expectations.SatisfiedExpectations(dcKey)
 | 
				
			||||||
 | 
						if dcNeedsSync {
 | 
				
			||||||
 | 
							dm.manageDaemons(dc)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						dm.updateDaemonStatus(dc)
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										321
									
								
								pkg/controller/daemon/manager_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										321
									
								
								pkg/controller/daemon/manager_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,321 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2015 The Kubernetes Authors All rights reserved.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package daemon
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/api"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/api/testapi"
 | 
				
			||||||
 | 
						client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/client/unversioned/cache"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/controller"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/expapi"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/securitycontext"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var (
 | 
				
			||||||
 | 
						simpleDaemonLabel  = map[string]string{"name": "simple-daemon", "type": "production"}
 | 
				
			||||||
 | 
						simpleDaemonLabel2 = map[string]string{"name": "simple-daemon", "type": "test"}
 | 
				
			||||||
 | 
						simpleNodeLabel    = map[string]string{"color": "blue", "speed": "fast"}
 | 
				
			||||||
 | 
						simpleNodeLabel2   = map[string]string{"color": "red", "speed": "fast"}
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type FakePodControl struct {
 | 
				
			||||||
 | 
						daemonSpec    []expapi.DaemonSet
 | 
				
			||||||
 | 
						deletePodName []string
 | 
				
			||||||
 | 
						lock          sync.Mutex
 | 
				
			||||||
 | 
						err           error
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func init() {
 | 
				
			||||||
 | 
						api.ForTesting_ReferencesAllowBlankSelfLinks = true
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationController) error {
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (f *FakePodControl) CreateReplicaOnNode(namespace string, daemon *expapi.DaemonSet, nodeName string) error {
 | 
				
			||||||
 | 
						f.lock.Lock()
 | 
				
			||||||
 | 
						defer f.lock.Unlock()
 | 
				
			||||||
 | 
						if f.err != nil {
 | 
				
			||||||
 | 
							return f.err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						f.daemonSpec = append(f.daemonSpec, *daemon)
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (f *FakePodControl) DeletePod(namespace string, podName string) error {
 | 
				
			||||||
 | 
						f.lock.Lock()
 | 
				
			||||||
 | 
						defer f.lock.Unlock()
 | 
				
			||||||
 | 
						if f.err != nil {
 | 
				
			||||||
 | 
							return f.err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						f.deletePodName = append(f.deletePodName, podName)
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					func (f *FakePodControl) clear() {
 | 
				
			||||||
 | 
						f.lock.Lock()
 | 
				
			||||||
 | 
						defer f.lock.Unlock()
 | 
				
			||||||
 | 
						f.deletePodName = []string{}
 | 
				
			||||||
 | 
						f.daemonSpec = []expapi.DaemonSet{}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func newDaemon(name string) *expapi.DaemonSet {
 | 
				
			||||||
 | 
						return &expapi.DaemonSet{
 | 
				
			||||||
 | 
							TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
 | 
				
			||||||
 | 
							ObjectMeta: api.ObjectMeta{
 | 
				
			||||||
 | 
								Name:      name,
 | 
				
			||||||
 | 
								Namespace: api.NamespaceDefault,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							Spec: expapi.DaemonSetSpec{
 | 
				
			||||||
 | 
								Selector: simpleDaemonLabel,
 | 
				
			||||||
 | 
								Template: &api.PodTemplateSpec{
 | 
				
			||||||
 | 
									ObjectMeta: api.ObjectMeta{
 | 
				
			||||||
 | 
										Labels: simpleDaemonLabel,
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
									Spec: api.PodSpec{
 | 
				
			||||||
 | 
										Containers: []api.Container{
 | 
				
			||||||
 | 
											{
 | 
				
			||||||
 | 
												Image: "foo/bar",
 | 
				
			||||||
 | 
												TerminationMessagePath: api.TerminationMessagePathDefault,
 | 
				
			||||||
 | 
												ImagePullPolicy:        api.PullIfNotPresent,
 | 
				
			||||||
 | 
												SecurityContext:        securitycontext.ValidSecurityContextWithContainerDefaults(),
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
										DNSPolicy: api.DNSDefault,
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func newNode(name string, label map[string]string) *api.Node {
 | 
				
			||||||
 | 
						return &api.Node{
 | 
				
			||||||
 | 
							TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
 | 
				
			||||||
 | 
							ObjectMeta: api.ObjectMeta{
 | 
				
			||||||
 | 
								Name:      name,
 | 
				
			||||||
 | 
								Labels:    label,
 | 
				
			||||||
 | 
								Namespace: api.NamespaceDefault,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func addNodes(nodeStore cache.Store, startIndex, numNodes int, label map[string]string) {
 | 
				
			||||||
 | 
						for i := startIndex; i < startIndex+numNodes; i++ {
 | 
				
			||||||
 | 
							nodeStore.Add(newNode(fmt.Sprintf("node-%d", i), label))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func newPod(podName string, nodeName string, label map[string]string) *api.Pod {
 | 
				
			||||||
 | 
						pod := &api.Pod{
 | 
				
			||||||
 | 
							TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
 | 
				
			||||||
 | 
							ObjectMeta: api.ObjectMeta{
 | 
				
			||||||
 | 
								GenerateName: podName,
 | 
				
			||||||
 | 
								Labels:       label,
 | 
				
			||||||
 | 
								Namespace:    api.NamespaceDefault,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							Spec: api.PodSpec{
 | 
				
			||||||
 | 
								NodeName: nodeName,
 | 
				
			||||||
 | 
								Containers: []api.Container{
 | 
				
			||||||
 | 
									{
 | 
				
			||||||
 | 
										Image: "foo/bar",
 | 
				
			||||||
 | 
										TerminationMessagePath: api.TerminationMessagePathDefault,
 | 
				
			||||||
 | 
										ImagePullPolicy:        api.PullIfNotPresent,
 | 
				
			||||||
 | 
										SecurityContext:        securitycontext.ValidSecurityContextWithContainerDefaults(),
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								DNSPolicy: api.DNSDefault,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						api.GenerateName(api.SimpleNameGenerator, &pod.ObjectMeta)
 | 
				
			||||||
 | 
						return pod
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func addPods(podStore cache.Store, nodeName string, label map[string]string, number int) {
 | 
				
			||||||
 | 
						for i := 0; i < number; i++ {
 | 
				
			||||||
 | 
							podStore.Add(newPod(fmt.Sprintf("%s-", nodeName), nodeName, label))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func makeTestManager() (*DaemonManager, *FakePodControl) {
 | 
				
			||||||
 | 
						client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
 | 
				
			||||||
 | 
						manager := NewDaemonManager(client)
 | 
				
			||||||
 | 
						podControl := &FakePodControl{}
 | 
				
			||||||
 | 
						manager.podControl = podControl
 | 
				
			||||||
 | 
						return manager, podControl
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func validateSyncDaemons(t *testing.T, fakePodControl *FakePodControl, expectedCreates, expectedDeletes int) {
 | 
				
			||||||
 | 
						if len(fakePodControl.daemonSpec) != expectedCreates {
 | 
				
			||||||
 | 
							t.Errorf("Unexpected number of creates.  Expected %d, saw %d\n", expectedCreates, len(fakePodControl.daemonSpec))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(fakePodControl.deletePodName) != expectedDeletes {
 | 
				
			||||||
 | 
							t.Errorf("Unexpected number of deletes.  Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.deletePodName))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func syncAndValidateDaemons(t *testing.T, manager *DaemonManager, daemon *expapi.DaemonSet, podControl *FakePodControl, expectedCreates, expectedDeletes int) {
 | 
				
			||||||
 | 
						key, err := controller.KeyFunc(daemon)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Errorf("Could not get key for daemon.")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						manager.syncHandler(key)
 | 
				
			||||||
 | 
						validateSyncDaemons(t, podControl, expectedCreates, expectedDeletes)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Daemon without node selectors should launch pods on every node.
 | 
				
			||||||
 | 
					func TestSimpleDaemonLaunchesPods(t *testing.T) {
 | 
				
			||||||
 | 
						manager, podControl := makeTestManager()
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 0, 5, nil)
 | 
				
			||||||
 | 
						daemon := newDaemon("foo")
 | 
				
			||||||
 | 
						manager.dcStore.Add(daemon)
 | 
				
			||||||
 | 
						syncAndValidateDaemons(t, manager, daemon, podControl, 5, 0)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Daemon without node selectors should launch pods on every node.
 | 
				
			||||||
 | 
					func TestNoNodesDoesNothing(t *testing.T) {
 | 
				
			||||||
 | 
						manager, podControl := makeTestManager()
 | 
				
			||||||
 | 
						daemon := newDaemon("foo")
 | 
				
			||||||
 | 
						manager.dcStore.Add(daemon)
 | 
				
			||||||
 | 
						syncAndValidateDaemons(t, manager, daemon, podControl, 0, 0)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Daemon without node selectors should launch pods on every node.
 | 
				
			||||||
 | 
					func TestOneNodeDaemonLaunchesPod(t *testing.T) {
 | 
				
			||||||
 | 
						manager, podControl := makeTestManager()
 | 
				
			||||||
 | 
						manager.nodeStore.Add(newNode("only-node", nil))
 | 
				
			||||||
 | 
						daemon := newDaemon("foo")
 | 
				
			||||||
 | 
						manager.dcStore.Add(daemon)
 | 
				
			||||||
 | 
						syncAndValidateDaemons(t, manager, daemon, podControl, 1, 0)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Manager should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods.
 | 
				
			||||||
 | 
					func TestDealsWithExistingPods(t *testing.T) {
 | 
				
			||||||
 | 
						manager, podControl := makeTestManager()
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 0, 5, nil)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-1", simpleDaemonLabel, 1)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-2", simpleDaemonLabel, 2)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-3", simpleDaemonLabel, 5)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-4", simpleDaemonLabel2, 2)
 | 
				
			||||||
 | 
						daemon := newDaemon("foo")
 | 
				
			||||||
 | 
						manager.dcStore.Add(daemon)
 | 
				
			||||||
 | 
						syncAndValidateDaemons(t, manager, daemon, podControl, 2, 5)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Daemon with node selector should launch pods on nodes matching selector.
 | 
				
			||||||
 | 
					func TestSelectorDaemonLaunchesPods(t *testing.T) {
 | 
				
			||||||
 | 
						manager, podControl := makeTestManager()
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 0, 4, nil)
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
 | 
				
			||||||
 | 
						daemon := newDaemon("foo")
 | 
				
			||||||
 | 
						daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
 | 
				
			||||||
 | 
						manager.dcStore.Add(daemon)
 | 
				
			||||||
 | 
						syncAndValidateDaemons(t, manager, daemon, podControl, 3, 0)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Daemon with node selector should delete pods from nodes that do not satisfy selector.
 | 
				
			||||||
 | 
					func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) {
 | 
				
			||||||
 | 
						manager, podControl := makeTestManager()
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 0, 5, nil)
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-0", simpleDaemonLabel2, 2)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-1", simpleDaemonLabel, 3)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-1", simpleDaemonLabel2, 1)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-4", simpleDaemonLabel, 1)
 | 
				
			||||||
 | 
						daemon := newDaemon("foo")
 | 
				
			||||||
 | 
						daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
 | 
				
			||||||
 | 
						manager.dcStore.Add(daemon)
 | 
				
			||||||
 | 
						syncAndValidateDaemons(t, manager, daemon, podControl, 5, 4)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Daemon with node selector should launch pods on nodes matching selector, but also deal with existing pods on nodes.
 | 
				
			||||||
 | 
					func TestSelectorDaemonDealsWithExistingPods(t *testing.T) {
 | 
				
			||||||
 | 
						manager, podControl := makeTestManager()
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 0, 5, nil)
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-0", simpleDaemonLabel, 1)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-1", simpleDaemonLabel, 3)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-1", simpleDaemonLabel2, 2)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-2", simpleDaemonLabel, 4)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-6", simpleDaemonLabel, 13)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-7", simpleDaemonLabel2, 4)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-9", simpleDaemonLabel, 1)
 | 
				
			||||||
 | 
						addPods(manager.podStore.Store, "node-9", simpleDaemonLabel2, 1)
 | 
				
			||||||
 | 
						daemon := newDaemon("foo")
 | 
				
			||||||
 | 
						daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
 | 
				
			||||||
 | 
						manager.dcStore.Add(daemon)
 | 
				
			||||||
 | 
						syncAndValidateDaemons(t, manager, daemon, podControl, 3, 20)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Daemon with node selector which does not match any node labels should not launch pods.
 | 
				
			||||||
 | 
					func TestBadSelectorDaemonDoesNothing(t *testing.T) {
 | 
				
			||||||
 | 
						manager, podControl := makeTestManager()
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 0, 4, nil)
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
 | 
				
			||||||
 | 
						daemon := newDaemon("foo")
 | 
				
			||||||
 | 
						daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel2
 | 
				
			||||||
 | 
						manager.dcStore.Add(daemon)
 | 
				
			||||||
 | 
						syncAndValidateDaemons(t, manager, daemon, podControl, 0, 0)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Daemon with node name should launch pod on node with corresponding name.
 | 
				
			||||||
 | 
					func TestNameDaemonLaunchesPods(t *testing.T) {
 | 
				
			||||||
 | 
						manager, podControl := makeTestManager()
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 0, 5, nil)
 | 
				
			||||||
 | 
						daemon := newDaemon("foo")
 | 
				
			||||||
 | 
						daemon.Spec.Template.Spec.NodeName = "node-0"
 | 
				
			||||||
 | 
						manager.dcStore.Add(daemon)
 | 
				
			||||||
 | 
						syncAndValidateDaemons(t, manager, daemon, podControl, 1, 0)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Daemon with node name that does not exist should not launch pods.
 | 
				
			||||||
 | 
					func TestBadNameDaemonDoesNothing(t *testing.T) {
 | 
				
			||||||
 | 
						manager, podControl := makeTestManager()
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 0, 5, nil)
 | 
				
			||||||
 | 
						daemon := newDaemon("foo")
 | 
				
			||||||
 | 
						daemon.Spec.Template.Spec.NodeName = "node-10"
 | 
				
			||||||
 | 
						manager.dcStore.Add(daemon)
 | 
				
			||||||
 | 
						syncAndValidateDaemons(t, manager, daemon, podControl, 0, 0)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Daemon with node selector, and node name, matching a node, should launch a pod on the node.
 | 
				
			||||||
 | 
					func TestNameAndSelectorDaemonLaunchesPods(t *testing.T) {
 | 
				
			||||||
 | 
						manager, podControl := makeTestManager()
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 0, 4, nil)
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
 | 
				
			||||||
 | 
						daemon := newDaemon("foo")
 | 
				
			||||||
 | 
						daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
 | 
				
			||||||
 | 
						daemon.Spec.Template.Spec.NodeName = "node-6"
 | 
				
			||||||
 | 
						manager.dcStore.Add(daemon)
 | 
				
			||||||
 | 
						syncAndValidateDaemons(t, manager, daemon, podControl, 1, 0)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Daemon with node selector that matches some nodes, and node name that matches a different node, should do nothing.
 | 
				
			||||||
 | 
					func TestInconsistentNameSelectorDaemonDoesNothing(t *testing.T) {
 | 
				
			||||||
 | 
						manager, podControl := makeTestManager()
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 0, 4, nil)
 | 
				
			||||||
 | 
						addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
 | 
				
			||||||
 | 
						daemon := newDaemon("foo")
 | 
				
			||||||
 | 
						daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
 | 
				
			||||||
 | 
						daemon.Spec.Template.Spec.NodeName = "node-0"
 | 
				
			||||||
 | 
						manager.dcStore.Add(daemon)
 | 
				
			||||||
 | 
						syncAndValidateDaemons(t, manager, daemon, podControl, 0, 0)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -31,6 +31,7 @@ import (
 | 
				
			|||||||
	client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
						client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/client/unversioned/testclient"
 | 
						"k8s.io/kubernetes/pkg/client/unversioned/testclient"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/controller"
 | 
						"k8s.io/kubernetes/pkg/controller"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/expapi"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/labels"
 | 
						"k8s.io/kubernetes/pkg/labels"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/runtime"
 | 
						"k8s.io/kubernetes/pkg/runtime"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/securitycontext"
 | 
						"k8s.io/kubernetes/pkg/securitycontext"
 | 
				
			||||||
@@ -69,6 +70,10 @@ func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationCo
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (f *FakePodControl) CreateReplicaOnNode(namespace string, daemon *expapi.DaemonSet, nodeName string) error {
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (f *FakePodControl) DeletePod(namespace string, podName string) error {
 | 
					func (f *FakePodControl) DeletePod(namespace string, podName string) error {
 | 
				
			||||||
	f.lock.Lock()
 | 
						f.lock.Lock()
 | 
				
			||||||
	defer f.lock.Unlock()
 | 
						defer f.lock.Unlock()
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user