mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			748 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			748 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2015 The Kubernetes Authors All rights reserved.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package executor
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"math"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
 | 
						|
	clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_1"
 | 
						|
 | 
						|
	"github.com/fsouza/go-dockerclient"
 | 
						|
	"github.com/gogo/protobuf/proto"
 | 
						|
	log "github.com/golang/glog"
 | 
						|
	bindings "github.com/mesos/mesos-go/executor"
 | 
						|
	mesos "github.com/mesos/mesos-go/mesosproto"
 | 
						|
	mutil "github.com/mesos/mesos-go/mesosutil"
 | 
						|
	"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
 | 
						|
	"k8s.io/kubernetes/contrib/mesos/pkg/node"
 | 
						|
	"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
 | 
						|
	"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
 | 
						|
	"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/executorinfo"
 | 
						|
	"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
 | 
						|
	"k8s.io/kubernetes/pkg/api"
 | 
						|
	apierrors "k8s.io/kubernetes/pkg/api/errors"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/container"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/dockertools"
 | 
						|
	kruntime "k8s.io/kubernetes/pkg/runtime"
 | 
						|
	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	containerPollTime = 1 * time.Second
 | 
						|
	lostPodPollTime   = 1 * time.Minute
 | 
						|
	podRelistPeriod   = 5 * time.Minute
 | 
						|
)
 | 
						|
 | 
						|
type stateType int32
 | 
						|
 | 
						|
const (
 | 
						|
	disconnectedState stateType = iota
 | 
						|
	connectedState
 | 
						|
	suicidalState
 | 
						|
	terminalState
 | 
						|
)
 | 
						|
 | 
						|
func (s *stateType) get() stateType {
 | 
						|
	return stateType(atomic.LoadInt32((*int32)(s)))
 | 
						|
}
 | 
						|
 | 
						|
func (s *stateType) transition(from, to stateType) bool {
 | 
						|
	return atomic.CompareAndSwapInt32((*int32)(s), int32(from), int32(to))
 | 
						|
}
 | 
						|
 | 
						|
func (s *stateType) transitionTo(to stateType, unless ...stateType) bool {
 | 
						|
	if len(unless) == 0 {
 | 
						|
		atomic.StoreInt32((*int32)(s), int32(to))
 | 
						|
		return true
 | 
						|
	}
 | 
						|
	for {
 | 
						|
		state := s.get()
 | 
						|
		for _, x := range unless {
 | 
						|
			if state == x {
 | 
						|
				return false
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if s.transition(state, to) {
 | 
						|
			return true
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// KubernetesExecutor is an mesos executor that runs pods
 | 
						|
// in a minion machine.
 | 
						|
type Executor struct {
 | 
						|
	state                stateType
 | 
						|
	lock                 sync.Mutex
 | 
						|
	terminate            chan struct{}                     // signals that the executor is shutting down
 | 
						|
	outgoing             chan func() (mesos.Status, error) // outgoing queue to the mesos driver
 | 
						|
	dockerClient         dockertools.DockerInterface
 | 
						|
	suicideWatch         suicideWatcher
 | 
						|
	suicideTimeout       time.Duration
 | 
						|
	shutdownAlert        func()          // invoked just prior to executor shutdown
 | 
						|
	kubeletFinished      <-chan struct{} // signals that kubelet Run() died
 | 
						|
	exitFunc             func(int)
 | 
						|
	staticPodsConfigPath string
 | 
						|
	launchGracePeriod    time.Duration
 | 
						|
	nodeInfos            chan<- NodeInfo
 | 
						|
	initCompleted        chan struct{} // closes upon completion of Init()
 | 
						|
	registry             Registry
 | 
						|
	watcher              *watcher
 | 
						|
	kubeAPI              kubeAPI
 | 
						|
	nodeAPI              nodeAPI
 | 
						|
}
 | 
						|
 | 
						|
type Config struct {
 | 
						|
	APIClient            *clientset.Clientset
 | 
						|
	Docker               dockertools.DockerInterface
 | 
						|
	ShutdownAlert        func()
 | 
						|
	SuicideTimeout       time.Duration
 | 
						|
	KubeletFinished      <-chan struct{} // signals that kubelet Run() died
 | 
						|
	ExitFunc             func(int)
 | 
						|
	StaticPodsConfigPath string
 | 
						|
	LaunchGracePeriod    time.Duration
 | 
						|
	NodeInfos            chan<- NodeInfo
 | 
						|
	Registry             Registry
 | 
						|
}
 | 
						|
 | 
						|
func (k *Executor) isConnected() bool {
 | 
						|
	return connectedState == (&k.state).get()
 | 
						|
}
 | 
						|
 | 
						|
// New creates a new kubernetes executor.
 | 
						|
func New(config Config) *Executor {
 | 
						|
	launchGracePeriod := config.LaunchGracePeriod
 | 
						|
	if launchGracePeriod == 0 {
 | 
						|
		// this is the equivalent of saying "the timer never expires" and simplies nil
 | 
						|
		// timer checks elsewhere in the code. it's a little hacky but less code to
 | 
						|
		// maintain that alternative approaches.
 | 
						|
		launchGracePeriod = time.Duration(math.MaxInt64)
 | 
						|
	}
 | 
						|
	k := &Executor{
 | 
						|
		state:                disconnectedState,
 | 
						|
		terminate:            make(chan struct{}),
 | 
						|
		outgoing:             make(chan func() (mesos.Status, error), 1024),
 | 
						|
		dockerClient:         config.Docker,
 | 
						|
		suicideTimeout:       config.SuicideTimeout,
 | 
						|
		kubeletFinished:      config.KubeletFinished,
 | 
						|
		suicideWatch:         &suicideTimer{},
 | 
						|
		shutdownAlert:        config.ShutdownAlert,
 | 
						|
		exitFunc:             config.ExitFunc,
 | 
						|
		staticPodsConfigPath: config.StaticPodsConfigPath,
 | 
						|
		launchGracePeriod:    launchGracePeriod,
 | 
						|
		nodeInfos:            config.NodeInfos,
 | 
						|
		initCompleted:        make(chan struct{}),
 | 
						|
		registry:             config.Registry,
 | 
						|
		kubeAPI:              &clientAPIWrapper{config.APIClient},
 | 
						|
		nodeAPI:              &clientAPIWrapper{config.APIClient},
 | 
						|
	}
 | 
						|
 | 
						|
	runtime.On(k.initCompleted, k.runSendLoop)
 | 
						|
 | 
						|
	k.watcher = newWatcher(k.registry.watch())
 | 
						|
	runtime.On(k.initCompleted, k.watcher.run)
 | 
						|
 | 
						|
	return k
 | 
						|
}
 | 
						|
 | 
						|
// Done returns a chan that closes when the executor is shutting down
 | 
						|
func (k *Executor) Done() <-chan struct{} {
 | 
						|
	return k.terminate
 | 
						|
}
 | 
						|
 | 
						|
func (k *Executor) Init(driver bindings.ExecutorDriver) {
 | 
						|
	defer close(k.initCompleted)
 | 
						|
 | 
						|
	k.killKubeletContainers()
 | 
						|
	k.resetSuicideWatch(driver)
 | 
						|
 | 
						|
	k.watcher.addFilter(func(podEvent *PodEvent) bool {
 | 
						|
		switch podEvent.eventType {
 | 
						|
		case PodEventIncompatibleUpdate:
 | 
						|
			log.Warningf("killing %s because of an incompatible update", podEvent.FormatShort())
 | 
						|
			k.killPodTask(driver, podEvent.taskID)
 | 
						|
			// halt processing of this event; when the pod is deleted we'll receive another
 | 
						|
			// event for that.
 | 
						|
			return false
 | 
						|
 | 
						|
		case PodEventDeleted:
 | 
						|
			// an active pod-task was deleted, alert mesos:
 | 
						|
			// send back a TASK_KILLED status, we completed the pod-task lifecycle normally.
 | 
						|
			k.resetSuicideWatch(driver)
 | 
						|
			k.sendStatus(driver, newStatus(mutil.NewTaskID(podEvent.taskID), mesos.TaskState_TASK_KILLED, "pod-deleted"))
 | 
						|
		}
 | 
						|
		return true
 | 
						|
	})
 | 
						|
 | 
						|
	//TODO(jdef) monitor kubeletFinished and shutdown if it happens
 | 
						|
}
 | 
						|
 | 
						|
func (k *Executor) isDone() bool {
 | 
						|
	select {
 | 
						|
	case <-k.terminate:
 | 
						|
		return true
 | 
						|
	default:
 | 
						|
		return false
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Registered is called when the executor is successfully registered with the slave.
 | 
						|
func (k *Executor) Registered(
 | 
						|
	driver bindings.ExecutorDriver,
 | 
						|
	executorInfo *mesos.ExecutorInfo,
 | 
						|
	frameworkInfo *mesos.FrameworkInfo,
 | 
						|
	slaveInfo *mesos.SlaveInfo,
 | 
						|
) {
 | 
						|
	if k.isDone() {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	log.Infof(
 | 
						|
		"Executor %v of framework %v registered with slave %v\n",
 | 
						|
		executorInfo, frameworkInfo, slaveInfo,
 | 
						|
	)
 | 
						|
 | 
						|
	if !(&k.state).transition(disconnectedState, connectedState) {
 | 
						|
		log.Errorf("failed to register/transition to a connected state")
 | 
						|
	}
 | 
						|
 | 
						|
	if executorInfo != nil && executorInfo.Data != nil {
 | 
						|
		err := k.initializeStaticPodsSource(slaveInfo.GetHostname(), executorInfo.Data)
 | 
						|
		if err != nil {
 | 
						|
			log.Errorf("failed to initialize static pod configuration: %v", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	annotations, err := annotationsFor(executorInfo)
 | 
						|
	if err != nil {
 | 
						|
		log.Errorf(
 | 
						|
			"cannot get node annotations from executor info %v error %v",
 | 
						|
			executorInfo, err,
 | 
						|
		)
 | 
						|
	}
 | 
						|
 | 
						|
	if slaveInfo != nil {
 | 
						|
		_, err := k.nodeAPI.createOrUpdate(
 | 
						|
			slaveInfo.GetHostname(),
 | 
						|
			node.SlaveAttributesToLabels(slaveInfo.Attributes),
 | 
						|
			annotations,
 | 
						|
		)
 | 
						|
 | 
						|
		if err != nil {
 | 
						|
			log.Errorf("cannot update node labels: %v", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	k.lock.Lock()
 | 
						|
	defer k.lock.Unlock()
 | 
						|
 | 
						|
	if slaveInfo != nil && k.nodeInfos != nil {
 | 
						|
		k.nodeInfos <- nodeInfo(slaveInfo, executorInfo) // leave it behind the upper lock to avoid panics
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Reregistered is called when the executor is successfully re-registered with the slave.
 | 
						|
// This can happen when the slave fails over.
 | 
						|
func (k *Executor) Reregistered(driver bindings.ExecutorDriver, slaveInfo *mesos.SlaveInfo) {
 | 
						|
	if k.isDone() {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	log.Infof("Reregistered with slave %v\n", slaveInfo)
 | 
						|
	if !(&k.state).transition(disconnectedState, connectedState) {
 | 
						|
		log.Errorf("failed to reregister/transition to a connected state")
 | 
						|
	}
 | 
						|
 | 
						|
	if slaveInfo != nil {
 | 
						|
		_, err := k.nodeAPI.createOrUpdate(
 | 
						|
			slaveInfo.GetHostname(),
 | 
						|
			node.SlaveAttributesToLabels(slaveInfo.Attributes),
 | 
						|
			nil, // don't change annotations
 | 
						|
		)
 | 
						|
 | 
						|
		if err != nil {
 | 
						|
			log.Errorf("cannot update node labels: %v", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if slaveInfo != nil && k.nodeInfos != nil {
 | 
						|
		// make sure nodeInfos is not nil and send new NodeInfo
 | 
						|
		k.lock.Lock()
 | 
						|
		defer k.lock.Unlock()
 | 
						|
		if k.isDone() {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		k.nodeInfos <- nodeInfo(slaveInfo, nil)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// initializeStaticPodsSource unzips the data slice into the static-pods directory
 | 
						|
func (k *Executor) initializeStaticPodsSource(hostname string, data []byte) error {
 | 
						|
	log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath)
 | 
						|
	// annotate the pod with BindingHostKey so that the scheduler will ignore the pod
 | 
						|
	// once it appears in the pod registry. the stock kubelet sets the pod host in order
 | 
						|
	// to accomplish the same; we do this because the k8sm scheduler works differently.
 | 
						|
	annotator := podutil.Annotator(map[string]string{
 | 
						|
		meta.BindingHostKey: hostname,
 | 
						|
	})
 | 
						|
	return podutil.WriteToDir(annotator.Do(podutil.Gunzip(data)), k.staticPodsConfigPath)
 | 
						|
}
 | 
						|
 | 
						|
// Disconnected is called when the executor is disconnected from the slave.
 | 
						|
func (k *Executor) Disconnected(driver bindings.ExecutorDriver) {
 | 
						|
	if k.isDone() {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	log.Infof("Slave is disconnected\n")
 | 
						|
	if !(&k.state).transition(connectedState, disconnectedState) {
 | 
						|
		log.Errorf("failed to disconnect/transition to a disconnected state")
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// LaunchTask is called when the executor receives a request to launch a task.
 | 
						|
// The happens when the k8sm scheduler has decided to schedule the pod
 | 
						|
// (which corresponds to a Mesos Task) onto the node where this executor
 | 
						|
// is running, but the binding is not recorded in the Kubernetes store yet.
 | 
						|
// This function is invoked to tell the executor to record the binding in the
 | 
						|
// Kubernetes store and start the pod via the Kubelet.
 | 
						|
func (k *Executor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.TaskInfo) {
 | 
						|
	if k.isDone() {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	log.Infof("Launch task %v\n", taskInfo)
 | 
						|
 | 
						|
	taskID := taskInfo.GetTaskId().GetValue()
 | 
						|
	if p := k.registry.pod(taskID); p != nil {
 | 
						|
		log.Warningf("task %v already launched", taskID)
 | 
						|
		// Not to send back TASK_RUNNING or TASK_FAILED here, because
 | 
						|
		// may be duplicated messages
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	if !k.isConnected() {
 | 
						|
		log.Errorf("Ignore launch task because the executor is disconnected\n")
 | 
						|
		k.sendStatus(driver, newStatus(taskInfo.GetTaskId(), mesos.TaskState_TASK_FAILED,
 | 
						|
			messages.ExecutorUnregistered))
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	obj, err := kruntime.Decode(api.Codecs.UniversalDecoder(), taskInfo.GetData())
 | 
						|
	if err != nil {
 | 
						|
		log.Errorf("failed to extract yaml data from the taskInfo.data %v", err)
 | 
						|
		k.sendStatus(driver, newStatus(taskInfo.GetTaskId(), mesos.TaskState_TASK_FAILED,
 | 
						|
			messages.UnmarshalTaskDataFailure))
 | 
						|
		return
 | 
						|
	}
 | 
						|
	pod, ok := obj.(*api.Pod)
 | 
						|
	if !ok {
 | 
						|
		log.Errorf("expected *api.Pod instead of %T: %+v", pod, pod)
 | 
						|
		k.sendStatus(driver, newStatus(taskInfo.GetTaskId(), mesos.TaskState_TASK_FAILED,
 | 
						|
			messages.UnmarshalTaskDataFailure))
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	k.resetSuicideWatch(driver)
 | 
						|
 | 
						|
	// run the next step aync because it calls out to apiserver and we don't want to block here
 | 
						|
	go k.bindAndWatchTask(driver, taskInfo, time.NewTimer(k.launchGracePeriod), pod)
 | 
						|
}
 | 
						|
 | 
						|
// determine whether we need to start a suicide countdown. if so, then start
 | 
						|
// a timer that, upon expiration, causes this executor to commit suicide.
 | 
						|
// this implementation runs asynchronously. callers that wish to wait for the
 | 
						|
// reset to complete may wait for the returned signal chan to close.
 | 
						|
func (k *Executor) resetSuicideWatch(driver bindings.ExecutorDriver) <-chan struct{} {
 | 
						|
	ch := make(chan struct{})
 | 
						|
	go func() {
 | 
						|
		defer close(ch)
 | 
						|
		k.lock.Lock()
 | 
						|
		defer k.lock.Unlock()
 | 
						|
 | 
						|
		if k.suicideTimeout < 1 {
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		if k.suicideWatch != nil {
 | 
						|
			if !k.registry.empty() {
 | 
						|
				k.suicideWatch.Stop()
 | 
						|
				return
 | 
						|
			}
 | 
						|
			if k.suicideWatch.Reset(k.suicideTimeout) {
 | 
						|
				// valid timer, reset was successful
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		//TODO(jdef) reduce verbosity here once we're convinced that suicide watch is working properly
 | 
						|
		log.Infof("resetting suicide watch timer for %v", k.suicideTimeout)
 | 
						|
 | 
						|
		k.suicideWatch = k.suicideWatch.Next(k.suicideTimeout, driver, jumper(k.attemptSuicide))
 | 
						|
	}()
 | 
						|
	return ch
 | 
						|
}
 | 
						|
 | 
						|
func (k *Executor) attemptSuicide(driver bindings.ExecutorDriver, abort <-chan struct{}) {
 | 
						|
	k.lock.Lock()
 | 
						|
	defer k.lock.Unlock()
 | 
						|
 | 
						|
	// this attempt may have been queued and since been aborted
 | 
						|
	select {
 | 
						|
	case <-abort:
 | 
						|
		//TODO(jdef) reduce verbosity once suicide watch is working properly
 | 
						|
		log.Infof("aborting suicide attempt since watch was cancelled")
 | 
						|
		return
 | 
						|
	default: // continue
 | 
						|
	}
 | 
						|
 | 
						|
	// fail-safe, will abort kamikaze attempts if there are tasks
 | 
						|
	if !k.registry.empty() {
 | 
						|
		log.Errorf("suicide attempt failed, there are still running tasks")
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	log.Infoln("Attempting suicide")
 | 
						|
	if (&k.state).transitionTo(suicidalState, suicidalState, terminalState) {
 | 
						|
		//TODO(jdef) let the scheduler know?
 | 
						|
		//TODO(jdef) is suicide more graceful than slave-demanded shutdown?
 | 
						|
		k.doShutdown(driver)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func podStatusData(pod *api.Pod, status api.PodStatus) ([]byte, string, error) {
 | 
						|
	podFullName := container.GetPodFullName(pod)
 | 
						|
	data, err := json.Marshal(api.PodStatusResult{
 | 
						|
		ObjectMeta: api.ObjectMeta{
 | 
						|
			Name:     podFullName,
 | 
						|
			SelfLink: "/podstatusresult",
 | 
						|
		},
 | 
						|
		Status: status,
 | 
						|
	})
 | 
						|
	return data, podFullName, err
 | 
						|
}
 | 
						|
 | 
						|
// async continuation of LaunchTask
 | 
						|
func (k *Executor) bindAndWatchTask(driver bindings.ExecutorDriver, task *mesos.TaskInfo, launchTimer *time.Timer, pod *api.Pod) {
 | 
						|
	success := false
 | 
						|
	defer func() {
 | 
						|
		if !success {
 | 
						|
			k.killPodTask(driver, task.TaskId.GetValue())
 | 
						|
			k.resetSuicideWatch(driver)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	// allow a recently failed-over scheduler the chance to recover the task/pod binding:
 | 
						|
	// it may have failed and recovered before the apiserver is able to report the updated
 | 
						|
	// binding information. replays of this status event will signal to the scheduler that
 | 
						|
	// the apiserver should be up-to-date.
 | 
						|
	startingData, _, err := podStatusData(pod, api.PodStatus{})
 | 
						|
	if err != nil {
 | 
						|
		log.Errorf("failed to generate pod-task starting data for task %v pod %v/%v: %v",
 | 
						|
			task.TaskId.GetValue(), pod.Namespace, pod.Name, err)
 | 
						|
		k.sendStatus(driver, newStatus(task.TaskId, mesos.TaskState_TASK_FAILED, err.Error()))
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	err = k.registry.bind(task.TaskId.GetValue(), pod)
 | 
						|
	if err != nil {
 | 
						|
		log.Errorf("failed to bind task %v pod %v/%v: %v",
 | 
						|
			task.TaskId.GetValue(), pod.Namespace, pod.Name, err)
 | 
						|
		k.sendStatus(driver, newStatus(task.TaskId, mesos.TaskState_TASK_FAILED, err.Error()))
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// send TASK_STARTING
 | 
						|
	k.sendStatus(driver, &mesos.TaskStatus{
 | 
						|
		TaskId:  task.TaskId,
 | 
						|
		State:   mesos.TaskState_TASK_STARTING.Enum(),
 | 
						|
		Message: proto.String(messages.CreateBindingSuccess),
 | 
						|
		Data:    startingData,
 | 
						|
	})
 | 
						|
 | 
						|
	// within the launch timeout window we should see a pod-task update via the registry.
 | 
						|
	// if we see a Running update then we need to generate a TASK_RUNNING status update for mesos.
 | 
						|
	handlerFinished := false
 | 
						|
	handler := watchHandler{
 | 
						|
		expiration: watchExpiration{
 | 
						|
			timeout: launchTimer.C,
 | 
						|
			onEvent: func(taskID string) {
 | 
						|
				if !handlerFinished {
 | 
						|
					// launch timeout expired
 | 
						|
					k.killPodTask(driver, task.TaskId.GetValue())
 | 
						|
				}
 | 
						|
			},
 | 
						|
		},
 | 
						|
		onEvent: func(podEvent *PodEvent) (bool, error) {
 | 
						|
			switch podEvent.eventType {
 | 
						|
			case PodEventUpdated:
 | 
						|
				log.V(2).Infof("Found status: '%v' for %s", podEvent.pod.Status, podEvent.FormatShort())
 | 
						|
 | 
						|
				if podEvent.pod.Status.Phase != api.PodRunning {
 | 
						|
					// still waiting for pod to transition to a running state, so
 | 
						|
					// we're not done monitoring yet; check back later..
 | 
						|
					break
 | 
						|
				}
 | 
						|
 | 
						|
				data, podFullName, err := podStatusData(podEvent.pod, podEvent.pod.Status)
 | 
						|
				if err != nil {
 | 
						|
					return false, fmt.Errorf("failed to marshal pod status result: %v", err)
 | 
						|
				}
 | 
						|
 | 
						|
				defer k.sendStatus(driver, &mesos.TaskStatus{
 | 
						|
					TaskId:  task.TaskId,
 | 
						|
					State:   mesos.TaskState_TASK_RUNNING.Enum(),
 | 
						|
					Message: proto.String("pod-running:" + podFullName),
 | 
						|
					Data:    data,
 | 
						|
				})
 | 
						|
				fallthrough
 | 
						|
 | 
						|
			case PodEventDeleted:
 | 
						|
				// we're done monitoring because pod has been deleted
 | 
						|
				handlerFinished = true
 | 
						|
				launchTimer.Stop()
 | 
						|
			}
 | 
						|
			return handlerFinished, nil
 | 
						|
		},
 | 
						|
	}
 | 
						|
	k.watcher.forTask(task.TaskId.GetValue(), handler)
 | 
						|
	success = true
 | 
						|
}
 | 
						|
 | 
						|
// KillTask is called when the executor receives a request to kill a task.
 | 
						|
func (k *Executor) KillTask(driver bindings.ExecutorDriver, taskId *mesos.TaskID) {
 | 
						|
	k.killPodTask(driver, taskId.GetValue())
 | 
						|
}
 | 
						|
 | 
						|
// deletes the pod and task associated with the task identified by taskID and sends a task
 | 
						|
// status update to mesos. also attempts to reset the suicide watch.
 | 
						|
func (k *Executor) killPodTask(driver bindings.ExecutorDriver, taskID string) {
 | 
						|
	pod := k.registry.pod(taskID)
 | 
						|
	if pod == nil {
 | 
						|
		log.V(1).Infof("Failed to remove task, unknown task %v\n", taskID)
 | 
						|
		k.sendStatus(driver, newStatus(&mesos.TaskID{Value: &taskID}, mesos.TaskState_TASK_LOST, "kill-pod-task"))
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// force-delete the pod from the API server
 | 
						|
	// TODO(jdef) possibly re-use eviction code from stock k8s once it lands?
 | 
						|
	err := k.kubeAPI.killPod(pod.Namespace, pod.Name)
 | 
						|
	if err != nil {
 | 
						|
		log.V(1).Infof("failed to delete task %v pod %v/%v from apiserver: %+v", taskID, pod.Namespace, pod.Name, err)
 | 
						|
		if apierrors.IsNotFound(err) {
 | 
						|
			k.sendStatus(driver, newStatus(&mesos.TaskID{Value: &taskID}, mesos.TaskState_TASK_LOST, "kill-pod-task"))
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// FrameworkMessage is called when the framework sends some message to the executor
 | 
						|
func (k *Executor) FrameworkMessage(driver bindings.ExecutorDriver, message string) {
 | 
						|
	if k.isDone() {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if !k.isConnected() {
 | 
						|
		log.Warningf("Ignore framework message because the executor is disconnected\n")
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	log.Infof("Receives message from framework %v\n", message)
 | 
						|
	//TODO(jdef) master reported a lost task, reconcile this! @see framework.go:handleTaskLost
 | 
						|
	if strings.HasPrefix(message, messages.TaskLost+":") {
 | 
						|
		taskId := message[len(messages.TaskLost)+1:]
 | 
						|
		if taskId != "" {
 | 
						|
			// TODO(jdef) would it make more sense to check the status of the task and
 | 
						|
			// just replay the last non-terminal message that we sent if the task is
 | 
						|
			// still active?
 | 
						|
 | 
						|
			// clean up pod state
 | 
						|
			k.sendStatus(driver, newStatus(&mesos.TaskID{Value: &taskId}, mesos.TaskState_TASK_LOST, messages.TaskLostAck))
 | 
						|
			k.killPodTask(driver, taskId)
 | 
						|
		}
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	switch message {
 | 
						|
	case messages.Kamikaze:
 | 
						|
		k.attemptSuicide(driver, nil)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Shutdown is called when the executor receives a shutdown request.
 | 
						|
func (k *Executor) Shutdown(driver bindings.ExecutorDriver) {
 | 
						|
	k.lock.Lock()
 | 
						|
	defer k.lock.Unlock()
 | 
						|
	k.doShutdown(driver)
 | 
						|
}
 | 
						|
 | 
						|
// assumes that caller has obtained state lock
 | 
						|
func (k *Executor) doShutdown(driver bindings.ExecutorDriver) {
 | 
						|
	defer func() {
 | 
						|
		log.Errorf("exiting with unclean shutdown: %v", recover())
 | 
						|
		if k.exitFunc != nil {
 | 
						|
			k.exitFunc(1)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	(&k.state).transitionTo(terminalState)
 | 
						|
 | 
						|
	// signal to all listeners that this KubeletExecutor is done!
 | 
						|
	close(k.terminate)
 | 
						|
	close(k.nodeInfos)
 | 
						|
 | 
						|
	if k.shutdownAlert != nil {
 | 
						|
		func() {
 | 
						|
			utilruntime.HandleCrash()
 | 
						|
			k.shutdownAlert()
 | 
						|
		}()
 | 
						|
	}
 | 
						|
 | 
						|
	log.Infoln("Stopping executor driver")
 | 
						|
	_, err := driver.Stop()
 | 
						|
	if err != nil {
 | 
						|
		log.Warningf("failed to stop executor driver: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	log.Infoln("Shutdown the executor")
 | 
						|
 | 
						|
	// according to docs, mesos will generate TASK_LOST updates for us
 | 
						|
	// if needed, so don't take extra time to do that here.
 | 
						|
	k.registry.shutdown()
 | 
						|
 | 
						|
	select {
 | 
						|
	// the main Run() func may still be running... wait for it to finish: it will
 | 
						|
	// clear the pod configuration cleanly, telling k8s "there are no pods" and
 | 
						|
	// clean up resources (pods, volumes, etc).
 | 
						|
	case <-k.kubeletFinished:
 | 
						|
 | 
						|
	//TODO(jdef) attempt to wait for events to propagate to API server?
 | 
						|
 | 
						|
	// TODO(jdef) extract constant, should be smaller than whatever the
 | 
						|
	// slave graceful shutdown timeout period is.
 | 
						|
	case <-time.After(15 * time.Second):
 | 
						|
		log.Errorf("timed out waiting for kubelet Run() to die")
 | 
						|
	}
 | 
						|
	log.Infoln("exiting")
 | 
						|
	if k.exitFunc != nil {
 | 
						|
		k.exitFunc(0)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Destroy existing k8s containers
 | 
						|
func (k *Executor) killKubeletContainers() {
 | 
						|
	if containers, err := dockertools.GetKubeletDockerContainers(k.dockerClient, true); err == nil {
 | 
						|
		opts := docker.RemoveContainerOptions{
 | 
						|
			RemoveVolumes: true,
 | 
						|
			Force:         true,
 | 
						|
		}
 | 
						|
		for _, container := range containers {
 | 
						|
			opts.ID = container.ID
 | 
						|
			log.V(2).Infof("Removing container: %v", opts.ID)
 | 
						|
			if err := k.dockerClient.RemoveContainer(opts); err != nil {
 | 
						|
				log.Warning(err)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		log.Warningf("Failed to list kubelet docker containers: %v", err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Error is called when some error happens.
 | 
						|
func (k *Executor) Error(driver bindings.ExecutorDriver, message string) {
 | 
						|
	log.Errorln(message)
 | 
						|
}
 | 
						|
 | 
						|
func newStatus(taskId *mesos.TaskID, state mesos.TaskState, message string) *mesos.TaskStatus {
 | 
						|
	return &mesos.TaskStatus{
 | 
						|
		TaskId:  taskId,
 | 
						|
		State:   &state,
 | 
						|
		Message: proto.String(message),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (k *Executor) sendStatus(driver bindings.ExecutorDriver, status *mesos.TaskStatus) {
 | 
						|
	select {
 | 
						|
	case <-k.terminate:
 | 
						|
	default:
 | 
						|
		k.outgoing <- func() (mesos.Status, error) { return driver.SendStatusUpdate(status) }
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (k *Executor) sendFrameworkMessage(driver bindings.ExecutorDriver, msg string) {
 | 
						|
	select {
 | 
						|
	case <-k.terminate:
 | 
						|
	default:
 | 
						|
		k.outgoing <- func() (mesos.Status, error) { return driver.SendFrameworkMessage(msg) }
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (k *Executor) runSendLoop() {
 | 
						|
	defer log.V(1).Info("sender loop exiting")
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-k.terminate:
 | 
						|
			return
 | 
						|
		default:
 | 
						|
			if !k.isConnected() {
 | 
						|
				select {
 | 
						|
				case <-k.terminate:
 | 
						|
				case <-time.After(1 * time.Second):
 | 
						|
				}
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			sender, ok := <-k.outgoing
 | 
						|
			if !ok {
 | 
						|
				// programming error
 | 
						|
				panic("someone closed the outgoing channel")
 | 
						|
			}
 | 
						|
			if status, err := sender(); err == nil {
 | 
						|
				continue
 | 
						|
			} else {
 | 
						|
				log.Error(err)
 | 
						|
				if status == mesos.Status_DRIVER_ABORTED {
 | 
						|
					return
 | 
						|
				}
 | 
						|
			}
 | 
						|
			// attempt to re-queue the sender
 | 
						|
			select {
 | 
						|
			case <-k.terminate:
 | 
						|
			case k.outgoing <- sender:
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func annotationsFor(ei *mesos.ExecutorInfo) (annotations map[string]string, err error) {
 | 
						|
	annotations = map[string]string{}
 | 
						|
	if ei == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	var buf bytes.Buffer
 | 
						|
	if err = executorinfo.EncodeResources(&buf, ei.GetResources()); err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	annotations[meta.ExecutorIdKey] = ei.GetExecutorId().GetValue()
 | 
						|
	annotations[meta.ExecutorResourcesKey] = buf.String()
 | 
						|
 | 
						|
	return
 | 
						|
}
 |