mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Node controller supports disabling node probes.
Node controller supports disabling sending node probes and updating node statuses. Controlled by --sync_node_status flag. Resolves #4565.
This commit is contained in:
		@@ -206,7 +206,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
 | 
			
		||||
	nodeResources := &api.NodeResources{}
 | 
			
		||||
 | 
			
		||||
	nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute)
 | 
			
		||||
	nodeController.Run(5*time.Second, true)
 | 
			
		||||
	nodeController.Run(5*time.Second, true, true)
 | 
			
		||||
 | 
			
		||||
	// Kubelet (localhost)
 | 
			
		||||
	testRootDir := makeTempDirOrDie("kubelet_integ_1.")
 | 
			
		||||
 
 | 
			
		||||
@@ -54,6 +54,7 @@ type CMServer struct {
 | 
			
		||||
	RegisterRetryCount      int
 | 
			
		||||
	MachineList             util.StringList
 | 
			
		||||
	SyncNodeList            bool
 | 
			
		||||
	SyncNodeStatus          bool
 | 
			
		||||
	PodEvictionTimeout      time.Duration
 | 
			
		||||
 | 
			
		||||
	// TODO: Discover these by pinging the host machines, and rip out these params.
 | 
			
		||||
@@ -75,6 +76,7 @@ func NewCMServer() *CMServer {
 | 
			
		||||
		NodeMilliCPU:            1000,
 | 
			
		||||
		NodeMemory:              resource.MustParse("3Gi"),
 | 
			
		||||
		SyncNodeList:            true,
 | 
			
		||||
		SyncNodeStatus:          true,
 | 
			
		||||
		KubeletConfig: client.KubeletConfig{
 | 
			
		||||
			Port:        ports.KubeletPort,
 | 
			
		||||
			EnableHttps: false,
 | 
			
		||||
@@ -100,6 +102,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
 | 
			
		||||
		"The number of retries for initial node registration.  Retry interval equals node_sync_period.")
 | 
			
		||||
	fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.")
 | 
			
		||||
	fs.BoolVar(&s.SyncNodeList, "sync_nodes", s.SyncNodeList, "If true, and --cloud_provider is specified, sync nodes from the cloud provider. Default true.")
 | 
			
		||||
	fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, "Should node controler send probes to kubelets and update NodeStatus.")
 | 
			
		||||
	// TODO: Discover these by pinging the host machines, and rip out these flags.
 | 
			
		||||
	// TODO: in the meantime, use resource.QuantityFlag() instead of these
 | 
			
		||||
	fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node")
 | 
			
		||||
@@ -158,7 +161,7 @@ func (s *CMServer) Run(_ []string) error {
 | 
			
		||||
 | 
			
		||||
	nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
 | 
			
		||||
		kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout)
 | 
			
		||||
	nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList)
 | 
			
		||||
	nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList, s.SyncNodeStatus)
 | 
			
		||||
 | 
			
		||||
	resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
 | 
			
		||||
	resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod)
 | 
			
		||||
 
 | 
			
		||||
@@ -127,7 +127,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
 | 
			
		||||
	kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}
 | 
			
		||||
 | 
			
		||||
	nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute)
 | 
			
		||||
	nodeController.Run(10*time.Second, true)
 | 
			
		||||
	nodeController.Run(10*time.Second, true, true)
 | 
			
		||||
 | 
			
		||||
	endpoints := service.NewEndpointController(cl)
 | 
			
		||||
	go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
 | 
			
		||||
 
 | 
			
		||||
@@ -77,7 +77,7 @@ func NewNodeController(
 | 
			
		||||
 | 
			
		||||
// Run creates initial node list and start syncing instances from cloudprovider if any.
 | 
			
		||||
// It also starts syncing cluster node status.
 | 
			
		||||
func (s *NodeController) Run(period time.Duration, syncNodeList bool) {
 | 
			
		||||
func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) {
 | 
			
		||||
	// Register intial set of nodes with their status set.
 | 
			
		||||
	var nodes *api.NodeList
 | 
			
		||||
	var err error
 | 
			
		||||
@@ -96,7 +96,6 @@ func (s *NodeController) Run(period time.Duration, syncNodeList bool) {
 | 
			
		||||
			glog.Errorf("Error loading initial static nodes: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	nodes = s.DoChecks(nodes)
 | 
			
		||||
	nodes, err = s.PopulateIPs(nodes)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Error getting nodes ips: %v", err)
 | 
			
		||||
@@ -114,12 +113,21 @@ func (s *NodeController) Run(period time.Duration, syncNodeList bool) {
 | 
			
		||||
		}, period)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if syncNodeStatus {
 | 
			
		||||
		// Start syncing node status.
 | 
			
		||||
		go util.Forever(func() {
 | 
			
		||||
			if err = s.SyncNodeStatus(); err != nil {
 | 
			
		||||
				glog.Errorf("Error syncing status: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
		}, period)
 | 
			
		||||
	} else {
 | 
			
		||||
		// Start checking node reachability and evicting timeouted pods.
 | 
			
		||||
		go util.Forever(func() {
 | 
			
		||||
			if err = s.EvictTimeoutedPods(); err != nil {
 | 
			
		||||
				glog.Errorf("Error evicting timeouted pods: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
		}, period)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RegisterNodes registers the given list of nodes, it keeps retrying for `retryCount` times.
 | 
			
		||||
@@ -216,6 +224,33 @@ func (s *NodeController) SyncNodeStatus() error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// EvictTimeoutedPods verifies if nodes are reachable by checking the time of last probe
 | 
			
		||||
// and deletes pods from not reachable nodes.
 | 
			
		||||
func (s *NodeController) EvictTimeoutedPods() error {
 | 
			
		||||
	nodes, err := s.kubeClient.Nodes().List()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	for _, node := range nodes.Items {
 | 
			
		||||
		if util.Now().After(latestReadyTime(&node).Add(s.podEvictionTimeout)) {
 | 
			
		||||
			s.deletePods(node.Name)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func latestReadyTime(node *api.Node) util.Time {
 | 
			
		||||
	readyTime := node.ObjectMeta.CreationTimestamp
 | 
			
		||||
	for _, condition := range node.Status.Conditions {
 | 
			
		||||
		if condition.Type == api.NodeReady &&
 | 
			
		||||
			condition.Status == api.ConditionFull &&
 | 
			
		||||
			condition.LastProbeTime.After(readyTime.Time) {
 | 
			
		||||
			readyTime = condition.LastProbeTime
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return readyTime
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// PopulateIPs queries IPs for given list of nodes.
 | 
			
		||||
func (s *NodeController) PopulateIPs(nodes *api.NodeList) (*api.NodeList, error) {
 | 
			
		||||
	if s.isRunningCloudProvider() {
 | 
			
		||||
 
 | 
			
		||||
@@ -709,6 +709,118 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestEvictTimeoutedPods(t *testing.T) {
 | 
			
		||||
	table := []struct {
 | 
			
		||||
		fakeNodeHandler      *FakeNodeHandler
 | 
			
		||||
		expectedRequestCount int
 | 
			
		||||
		expectedActions      []client.FakeAction
 | 
			
		||||
	}{
 | 
			
		||||
		// Node created long time ago, with no status.
 | 
			
		||||
		{
 | 
			
		||||
			fakeNodeHandler: &FakeNodeHandler{
 | 
			
		||||
				Existing: []*api.Node{
 | 
			
		||||
					{
 | 
			
		||||
						ObjectMeta: api.ObjectMeta{
 | 
			
		||||
							Name:              "node0",
 | 
			
		||||
							CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
				Fake: client.Fake{
 | 
			
		||||
					PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			expectedRequestCount: 1, // List
 | 
			
		||||
			expectedActions:      []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
 | 
			
		||||
		},
 | 
			
		||||
		// Node created recently, with no status.
 | 
			
		||||
		{
 | 
			
		||||
			fakeNodeHandler: &FakeNodeHandler{
 | 
			
		||||
				Existing: []*api.Node{
 | 
			
		||||
					{
 | 
			
		||||
						ObjectMeta: api.ObjectMeta{
 | 
			
		||||
							Name:              "node0",
 | 
			
		||||
							CreationTimestamp: util.Now(),
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
				Fake: client.Fake{
 | 
			
		||||
					PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			expectedRequestCount: 1, // List
 | 
			
		||||
			expectedActions:      nil,
 | 
			
		||||
		},
 | 
			
		||||
		// Node created long time ago, with status updated long time ago.
 | 
			
		||||
		{
 | 
			
		||||
			fakeNodeHandler: &FakeNodeHandler{
 | 
			
		||||
				Existing: []*api.Node{
 | 
			
		||||
					{
 | 
			
		||||
						ObjectMeta: api.ObjectMeta{
 | 
			
		||||
							Name:              "node0",
 | 
			
		||||
							CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
 | 
			
		||||
						},
 | 
			
		||||
						Status: api.NodeStatus{
 | 
			
		||||
							Conditions: []api.NodeCondition{
 | 
			
		||||
								{
 | 
			
		||||
									Type:          api.NodeReady,
 | 
			
		||||
									Status:        api.ConditionFull,
 | 
			
		||||
									LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC),
 | 
			
		||||
								},
 | 
			
		||||
							},
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
				Fake: client.Fake{
 | 
			
		||||
					PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			expectedRequestCount: 1, // List
 | 
			
		||||
			expectedActions:      []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
 | 
			
		||||
		},
 | 
			
		||||
		// Node created long time ago, with status updated recently.
 | 
			
		||||
		{
 | 
			
		||||
			fakeNodeHandler: &FakeNodeHandler{
 | 
			
		||||
				Existing: []*api.Node{
 | 
			
		||||
					{
 | 
			
		||||
						ObjectMeta: api.ObjectMeta{
 | 
			
		||||
							Name:              "node0",
 | 
			
		||||
							CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
 | 
			
		||||
						},
 | 
			
		||||
						Status: api.NodeStatus{
 | 
			
		||||
							Conditions: []api.NodeCondition{
 | 
			
		||||
								{
 | 
			
		||||
									Type:          api.NodeReady,
 | 
			
		||||
									Status:        api.ConditionFull,
 | 
			
		||||
									LastProbeTime: util.Now(),
 | 
			
		||||
								},
 | 
			
		||||
							},
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
				Fake: client.Fake{
 | 
			
		||||
					PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			expectedRequestCount: 1, // List
 | 
			
		||||
			expectedActions:      nil,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, item := range table {
 | 
			
		||||
		nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute)
 | 
			
		||||
		if err := nodeController.EvictTimeoutedPods(); err != nil {
 | 
			
		||||
			t.Errorf("unexpected error: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		if item.expectedRequestCount != item.fakeNodeHandler.RequestCount {
 | 
			
		||||
			t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
 | 
			
		||||
		}
 | 
			
		||||
		if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) {
 | 
			
		||||
			t.Errorf("actions differs, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestSyncNodeStatusDeletePods(t *testing.T) {
 | 
			
		||||
	table := []struct {
 | 
			
		||||
		fakeNodeHandler      *FakeNodeHandler
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user