mirror of
https://github.com/optim-enterprises-bv/kubernetes.git
synced 2025-11-03 03:38:15 +00:00
Reorder scheduler packages
- Scheduler implemenation -> components - types -> toplevel scheduler package - integration_test.go -> integration
This commit is contained in:
822
contrib/mesos/pkg/scheduler/integration/integration_test.go
Normal file
822
contrib/mesos/pkg/scheduler/integration/integration_test.go
Normal file
@@ -0,0 +1,822 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
log "github.com/golang/glog"
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
"github.com/mesos/mesos-go/mesosutil"
|
||||
bindings "github.com/mesos/mesos-go/scheduler"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
assertext "k8s.io/kubernetes/contrib/mesos/pkg/assert"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/schedulerloop"
|
||||
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
||||
mmock "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/mock"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// A apiserver mock which partially mocks the pods API
|
||||
type TestServer struct {
|
||||
stats map[string]uint
|
||||
nodes map[string]*api.Node
|
||||
lock sync.Mutex // guards above fields
|
||||
|
||||
server *httptest.Server
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (srv *TestServer) LookupNode(name string) *api.Node {
|
||||
srv.lock.Lock()
|
||||
defer srv.lock.Unlock()
|
||||
|
||||
node, _ := api.Scheme.DeepCopy(srv.nodes[name])
|
||||
return node.(*api.Node)
|
||||
}
|
||||
|
||||
func (srv *TestServer) WaitForNode(name string) {
|
||||
assertext.EventuallyTrue(srv.t, util.ForeverTestTimeout, func() bool {
|
||||
return srv.LookupNode(name) != nil
|
||||
})
|
||||
}
|
||||
|
||||
func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsListWatch) *TestServer {
|
||||
ts := TestServer{
|
||||
stats: map[string]uint{},
|
||||
nodes: map[string]*api.Node{},
|
||||
t: t,
|
||||
}
|
||||
mux := http.NewServeMux()
|
||||
|
||||
podListHandler := func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
pods := mockPodListWatch.Pods()
|
||||
w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), &pods)))
|
||||
}
|
||||
mux.HandleFunc(testapi.Default.ResourcePath("pods", namespace, ""), podListHandler)
|
||||
mux.HandleFunc(testapi.Default.ResourcePath("pods", "", ""), podListHandler)
|
||||
|
||||
podsPrefix := testapi.Default.ResourcePath("pods", namespace, "") + "/"
|
||||
mux.HandleFunc(podsPrefix, func(w http.ResponseWriter, r *http.Request) {
|
||||
name := r.URL.Path[len(podsPrefix):]
|
||||
|
||||
// update statistics for this pod
|
||||
ts.lock.Lock()
|
||||
defer ts.lock.Unlock()
|
||||
ts.stats[name] = ts.stats[name] + 1
|
||||
|
||||
p := mockPodListWatch.Pod(name)
|
||||
if p != nil {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), p)))
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
})
|
||||
|
||||
mux.HandleFunc(
|
||||
testapi.Default.ResourcePath("events", namespace, ""),
|
||||
func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
},
|
||||
)
|
||||
|
||||
mux.HandleFunc(
|
||||
testapi.Default.ResourcePath("nodes", "", ""),
|
||||
func(w http.ResponseWriter, r *http.Request) {
|
||||
var node api.Node
|
||||
if err := json.NewDecoder(r.Body).Decode(&node); err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
ts.lock.Lock()
|
||||
defer ts.lock.Unlock()
|
||||
ts.nodes[node.Name] = &node
|
||||
|
||||
if err := json.NewEncoder(w).Encode(node); err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
},
|
||||
)
|
||||
|
||||
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
|
||||
t.Errorf("unexpected request: %v", req.RequestURI)
|
||||
res.WriteHeader(http.StatusNotFound)
|
||||
})
|
||||
|
||||
ts.server = httptest.NewServer(mux)
|
||||
return &ts
|
||||
}
|
||||
|
||||
func (ts *TestServer) Stats(name string) uint {
|
||||
ts.lock.Lock()
|
||||
defer ts.lock.Unlock()
|
||||
|
||||
return ts.stats[name]
|
||||
}
|
||||
|
||||
// Create mock of pods ListWatch, usually listening on the apiserver pods watch endpoint
|
||||
type MockPodsListWatch struct {
|
||||
ListWatch cache.ListWatch
|
||||
fakeWatcher *watch.FakeWatcher
|
||||
list api.PodList
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func NewMockPodsListWatch(initialPodList api.PodList) *MockPodsListWatch {
|
||||
lw := MockPodsListWatch{
|
||||
fakeWatcher: watch.NewFake(),
|
||||
list: initialPodList,
|
||||
}
|
||||
lw.ListWatch = cache.ListWatch{
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return lw.fakeWatcher, nil
|
||||
},
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
lw.lock.Lock()
|
||||
defer lw.lock.Unlock()
|
||||
|
||||
listCopy, err := api.Scheme.DeepCopy(&lw.list)
|
||||
return listCopy.(*api.PodList), err
|
||||
},
|
||||
}
|
||||
return &lw
|
||||
}
|
||||
|
||||
func (lw *MockPodsListWatch) Pods() api.PodList {
|
||||
lw.lock.Lock()
|
||||
defer lw.lock.Unlock()
|
||||
|
||||
obj, _ := api.Scheme.DeepCopy(&lw.list)
|
||||
return *(obj.(*api.PodList))
|
||||
}
|
||||
|
||||
func (lw *MockPodsListWatch) Pod(name string) *api.Pod {
|
||||
lw.lock.Lock()
|
||||
defer lw.lock.Unlock()
|
||||
|
||||
for _, p := range lw.list.Items {
|
||||
if p.Name == name {
|
||||
return &p
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
func (lw *MockPodsListWatch) Add(pod *api.Pod, notify bool) {
|
||||
func() {
|
||||
lw.lock.Lock()
|
||||
defer lw.lock.Unlock()
|
||||
lw.list.Items = append(lw.list.Items, *pod)
|
||||
}()
|
||||
|
||||
if notify {
|
||||
lw.fakeWatcher.Add(pod)
|
||||
}
|
||||
}
|
||||
func (lw *MockPodsListWatch) Modify(pod *api.Pod, notify bool) {
|
||||
found := false
|
||||
func() {
|
||||
lw.lock.Lock()
|
||||
defer lw.lock.Unlock()
|
||||
|
||||
for i, otherPod := range lw.list.Items {
|
||||
if otherPod.Name == pod.Name {
|
||||
lw.list.Items[i] = *pod
|
||||
found = true
|
||||
return
|
||||
}
|
||||
}
|
||||
log.Fatalf("Cannot find pod %v to modify in MockPodsListWatch", pod.Name)
|
||||
}()
|
||||
|
||||
if notify && found {
|
||||
lw.fakeWatcher.Modify(pod)
|
||||
}
|
||||
}
|
||||
|
||||
func (lw *MockPodsListWatch) Delete(pod *api.Pod, notify bool) {
|
||||
var notifyPod *api.Pod
|
||||
func() {
|
||||
lw.lock.Lock()
|
||||
defer lw.lock.Unlock()
|
||||
|
||||
for i, otherPod := range lw.list.Items {
|
||||
if otherPod.Name == pod.Name {
|
||||
lw.list.Items = append(lw.list.Items[:i], lw.list.Items[i+1:]...)
|
||||
notifyPod = &otherPod
|
||||
return
|
||||
}
|
||||
}
|
||||
log.Fatalf("Cannot find pod %v to delete in MockPodsListWatch", pod.Name)
|
||||
}()
|
||||
|
||||
if notifyPod != nil && notify {
|
||||
lw.fakeWatcher.Delete(notifyPod)
|
||||
}
|
||||
}
|
||||
|
||||
// Create a pod with a given index, requiring one port
|
||||
var currentPodNum int = 0
|
||||
|
||||
func NewTestPod() (*api.Pod, int) {
|
||||
currentPodNum = currentPodNum + 1
|
||||
name := fmt.Sprintf("pod%d", currentPodNum)
|
||||
return &api.Pod{
|
||||
TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.Version()},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: api.NamespaceDefault,
|
||||
SelfLink: fmt.Sprintf("http://1.2.3.4/api/v1beta1/pods/%s", name),
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Ports: []api.ContainerPort{
|
||||
{
|
||||
ContainerPort: 8000 + currentPodNum,
|
||||
Protocol: api.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: api.PodStatus{
|
||||
PodIP: fmt.Sprintf("1.2.3.%d", 4+currentPodNum),
|
||||
Conditions: []api.PodCondition{
|
||||
{
|
||||
Type: api.PodReady,
|
||||
Status: api.ConditionTrue,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, currentPodNum
|
||||
}
|
||||
|
||||
// Offering some cpus and memory and the 8000-9000 port range
|
||||
func NewTestOffer(id string) *mesos.Offer {
|
||||
hostname := "some_hostname"
|
||||
cpus := mesosutil.NewScalarResource("cpus", 3.75)
|
||||
mem := mesosutil.NewScalarResource("mem", 940)
|
||||
var port8000 uint64 = 8000
|
||||
var port9000 uint64 = 9000
|
||||
ports8000to9000 := mesos.Value_Range{Begin: &port8000, End: &port9000}
|
||||
ports := mesosutil.NewRangesResource("ports", []*mesos.Value_Range{&ports8000to9000})
|
||||
return &mesos.Offer{
|
||||
Id: mesosutil.NewOfferID(id),
|
||||
Hostname: &hostname,
|
||||
SlaveId: mesosutil.NewSlaveID(hostname),
|
||||
Resources: []*mesos.Resource{cpus, mem, ports},
|
||||
}
|
||||
}
|
||||
|
||||
// Add assertions to reason about event streams
|
||||
type Event struct {
|
||||
Object runtime.Object
|
||||
Reason string
|
||||
Message string
|
||||
}
|
||||
|
||||
type EventPredicate func(e Event) bool
|
||||
|
||||
type EventAssertions struct {
|
||||
assert.Assertions
|
||||
}
|
||||
|
||||
// EventObserver implements record.EventRecorder for the purposes of validation via EventAssertions.
|
||||
type EventObserver struct {
|
||||
fifo chan Event
|
||||
}
|
||||
|
||||
func NewEventObserver() *EventObserver {
|
||||
return &EventObserver{
|
||||
fifo: make(chan Event, 1000),
|
||||
}
|
||||
}
|
||||
|
||||
func (o *EventObserver) Event(object runtime.Object, reason, message string) {
|
||||
o.fifo <- Event{Object: object, Reason: reason, Message: message}
|
||||
}
|
||||
|
||||
func (o *EventObserver) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) {
|
||||
o.fifo <- Event{Object: object, Reason: reason, Message: fmt.Sprintf(messageFmt, args...)}
|
||||
}
|
||||
func (o *EventObserver) PastEventf(object runtime.Object, timestamp unversioned.Time, reason, messageFmt string, args ...interface{}) {
|
||||
o.fifo <- Event{Object: object, Reason: reason, Message: fmt.Sprintf(messageFmt, args...)}
|
||||
}
|
||||
|
||||
func (a *EventAssertions) Event(observer *EventObserver, pred EventPredicate, msgAndArgs ...interface{}) bool {
|
||||
// parse msgAndArgs: first possibly a duration, otherwise a format string with further args
|
||||
timeout := util.ForeverTestTimeout
|
||||
msg := "event not received"
|
||||
msgArgStart := 0
|
||||
if len(msgAndArgs) > 0 {
|
||||
switch msgAndArgs[0].(type) {
|
||||
case time.Duration:
|
||||
timeout = msgAndArgs[0].(time.Duration)
|
||||
msgArgStart += 1
|
||||
}
|
||||
}
|
||||
if len(msgAndArgs) > msgArgStart {
|
||||
msg = fmt.Sprintf(msgAndArgs[msgArgStart].(string), msgAndArgs[msgArgStart+1:]...)
|
||||
}
|
||||
|
||||
// watch events
|
||||
result := make(chan bool)
|
||||
stop := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case e, ok := <-observer.fifo:
|
||||
if !ok {
|
||||
result <- false
|
||||
return
|
||||
} else if pred(e) {
|
||||
log.V(3).Infof("found asserted event for reason '%v': %v", e.Reason, e.Message)
|
||||
result <- true
|
||||
return
|
||||
} else {
|
||||
log.V(5).Infof("ignoring not-asserted event for reason '%v': %v", e.Reason, e.Message)
|
||||
}
|
||||
case _, ok := <-stop:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer close(stop)
|
||||
|
||||
// wait for watch to match or timeout
|
||||
select {
|
||||
case matched := <-result:
|
||||
return matched
|
||||
case <-time.After(timeout):
|
||||
return a.Fail(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *EventAssertions) EventWithReason(observer *EventObserver, reason string, msgAndArgs ...interface{}) bool {
|
||||
return a.Event(observer, func(e Event) bool {
|
||||
return e.Reason == reason
|
||||
}, msgAndArgs...)
|
||||
}
|
||||
|
||||
// Create mesos.TaskStatus for a given task
|
||||
func newTaskStatusForTask(task *mesos.TaskInfo, state mesos.TaskState) *mesos.TaskStatus {
|
||||
healthy := state == mesos.TaskState_TASK_RUNNING
|
||||
ts := float64(time.Now().Nanosecond()) / 1000000000.0
|
||||
source := mesos.TaskStatus_SOURCE_EXECUTOR
|
||||
return &mesos.TaskStatus{
|
||||
TaskId: task.TaskId,
|
||||
State: &state,
|
||||
SlaveId: task.SlaveId,
|
||||
ExecutorId: task.Executor.ExecutorId,
|
||||
Timestamp: &ts,
|
||||
Healthy: &healthy,
|
||||
Source: &source,
|
||||
Data: task.Data,
|
||||
}
|
||||
}
|
||||
|
||||
type LaunchedTask struct {
|
||||
offerId mesos.OfferID
|
||||
taskInfo *mesos.TaskInfo
|
||||
}
|
||||
|
||||
type lifecycleTest struct {
|
||||
apiServer *TestServer
|
||||
driver *mmock.JoinableDriver
|
||||
eventObs *EventObserver
|
||||
podsListWatch *MockPodsListWatch
|
||||
framework framework.Framework
|
||||
schedulerProc *ha.SchedulerProcess
|
||||
sched scheduler.Scheduler
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func newLifecycleTest(t *testing.T) lifecycleTest {
|
||||
assert := &EventAssertions{*assert.New(t)}
|
||||
|
||||
// create a fake pod watch. We use that below to submit new pods to the scheduler
|
||||
podsListWatch := NewMockPodsListWatch(api.PodList{})
|
||||
|
||||
// create fake apiserver
|
||||
apiServer := NewTestServer(t, api.NamespaceDefault, podsListWatch)
|
||||
|
||||
// create ExecutorInfo with some data for static pods if set
|
||||
ei := mesosutil.NewExecutorInfo(
|
||||
mesosutil.NewExecutorID("executor-id"),
|
||||
mesosutil.NewCommandInfo("executor-cmd"),
|
||||
)
|
||||
ei.Data = []byte{0, 1, 2}
|
||||
|
||||
// create framework
|
||||
client := client.NewOrDie(&client.Config{
|
||||
Host: apiServer.server.URL,
|
||||
Version: testapi.Default.Version(),
|
||||
})
|
||||
c := *schedcfg.CreateDefaultConfig()
|
||||
framework := framework.New(framework.Config{
|
||||
Executor: ei,
|
||||
Client: client,
|
||||
SchedulerConfig: c,
|
||||
LookupNode: apiServer.LookupNode,
|
||||
})
|
||||
|
||||
// TODO(sttts): re-enable the following tests
|
||||
// assert.NotNil(framework.client, "client is nil")
|
||||
// assert.NotNil(framework.executor, "executor is nil")
|
||||
// assert.NotNil(framework.offers, "offer registry is nil")
|
||||
|
||||
// create pod scheduler
|
||||
strategy := podschedulers.NewAllocationStrategy(
|
||||
podtask.NewDefaultPredicate(
|
||||
mresource.DefaultDefaultContainerCPULimit,
|
||||
mresource.DefaultDefaultContainerMemLimit,
|
||||
),
|
||||
podtask.NewDefaultProcurement(
|
||||
mresource.DefaultDefaultContainerCPULimit,
|
||||
mresource.DefaultDefaultContainerMemLimit,
|
||||
),
|
||||
)
|
||||
fcfs := podschedulers.NewFCFSPodScheduler(strategy, apiServer.LookupNode)
|
||||
|
||||
// create scheduler process
|
||||
schedulerProc := ha.New(framework)
|
||||
|
||||
// create scheduler
|
||||
eventObs := NewEventObserver()
|
||||
scheduler := components.NewScheduler(&c, framework, fcfs, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch)
|
||||
assert.NotNil(scheduler)
|
||||
|
||||
// create mock mesos scheduler driver
|
||||
driver := &mmock.JoinableDriver{}
|
||||
|
||||
return lifecycleTest{
|
||||
apiServer: apiServer,
|
||||
driver: driver,
|
||||
eventObs: eventObs,
|
||||
podsListWatch: podsListWatch,
|
||||
framework: framework,
|
||||
schedulerProc: schedulerProc,
|
||||
sched: scheduler,
|
||||
t: t,
|
||||
}
|
||||
}
|
||||
|
||||
func (lt lifecycleTest) Start() <-chan LaunchedTask {
|
||||
assert := &EventAssertions{*assert.New(lt.t)}
|
||||
lt.sched.Run(lt.schedulerProc.Terminal())
|
||||
|
||||
// init framework
|
||||
err := lt.framework.Init(
|
||||
lt.sched,
|
||||
lt.schedulerProc.Master(),
|
||||
http.DefaultServeMux,
|
||||
)
|
||||
assert.NoError(err)
|
||||
|
||||
lt.driver.On("Start").Return(mesos.Status_DRIVER_RUNNING, nil).Once()
|
||||
started := lt.driver.Upon()
|
||||
|
||||
lt.driver.On("ReconcileTasks",
|
||||
mock.AnythingOfType("[]*mesosproto.TaskStatus"),
|
||||
).Return(mesos.Status_DRIVER_RUNNING, nil)
|
||||
|
||||
lt.driver.On("SendFrameworkMessage",
|
||||
mock.AnythingOfType("*mesosproto.ExecutorID"),
|
||||
mock.AnythingOfType("*mesosproto.SlaveID"),
|
||||
mock.AnythingOfType("string"),
|
||||
).Return(mesos.Status_DRIVER_RUNNING, nil)
|
||||
|
||||
launchedTasks := make(chan LaunchedTask, 1)
|
||||
launchTasksFunc := func(args mock.Arguments) {
|
||||
offerIDs := args.Get(0).([]*mesos.OfferID)
|
||||
taskInfos := args.Get(1).([]*mesos.TaskInfo)
|
||||
assert.Equal(1, len(offerIDs))
|
||||
assert.Equal(1, len(taskInfos))
|
||||
|
||||
launchedTasks <- LaunchedTask{
|
||||
offerId: *offerIDs[0],
|
||||
taskInfo: taskInfos[0],
|
||||
}
|
||||
}
|
||||
|
||||
lt.driver.On("LaunchTasks",
|
||||
mock.AnythingOfType("[]*mesosproto.OfferID"),
|
||||
mock.AnythingOfType("[]*mesosproto.TaskInfo"),
|
||||
mock.AnythingOfType("*mesosproto.Filters"),
|
||||
).Return(mesos.Status_DRIVER_RUNNING, nil).Run(launchTasksFunc)
|
||||
|
||||
lt.driver.On("DeclineOffer",
|
||||
mock.AnythingOfType("*mesosproto.OfferID"),
|
||||
mock.AnythingOfType("*mesosproto.Filters"),
|
||||
).Return(mesos.Status_DRIVER_RUNNING, nil)
|
||||
|
||||
// elect master with mock driver
|
||||
driverFactory := ha.DriverFactory(func() (bindings.SchedulerDriver, error) {
|
||||
return lt.driver, nil
|
||||
})
|
||||
lt.schedulerProc.Elect(driverFactory)
|
||||
elected := lt.schedulerProc.Elected()
|
||||
|
||||
// driver will be started
|
||||
<-started
|
||||
|
||||
// tell scheduler to be registered
|
||||
lt.framework.Registered(
|
||||
lt.driver,
|
||||
mesosutil.NewFrameworkID("kubernetes-id"),
|
||||
mesosutil.NewMasterInfo("master-id", (192<<24)+(168<<16)+(0<<8)+1, 5050),
|
||||
)
|
||||
|
||||
// wait for being elected
|
||||
<-elected
|
||||
return launchedTasks
|
||||
}
|
||||
|
||||
func (lt lifecycleTest) Close() {
|
||||
lt.apiServer.server.Close()
|
||||
}
|
||||
|
||||
func (lt lifecycleTest) End() <-chan struct{} {
|
||||
return lt.schedulerProc.End()
|
||||
}
|
||||
|
||||
// TestScheduler_LifeCycle creates a scheduler plugin with the config returned by the scheduler,
|
||||
// and plays through the whole life cycle of the plugin while creating pods, deleting
|
||||
// and failing them.
|
||||
func TestScheduler_LifeCycle(t *testing.T) {
|
||||
assert := &EventAssertions{*assert.New(t)}
|
||||
lt := newLifecycleTest(t)
|
||||
defer lt.Close()
|
||||
|
||||
// run plugin
|
||||
launchedTasks := lt.Start()
|
||||
defer lt.End()
|
||||
|
||||
// fake new, unscheduled pod
|
||||
pod, i := NewTestPod()
|
||||
lt.podsListWatch.Add(pod, true) // notify watchers
|
||||
|
||||
// wait for failedScheduling event because there is no offer
|
||||
assert.EventWithReason(lt.eventObs, schedulerloop.FailedScheduling, "failedScheduling event not received")
|
||||
|
||||
// add some matching offer
|
||||
offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))}
|
||||
lt.framework.ResourceOffers(nil, offers)
|
||||
|
||||
// first offer is declined because node is not available yet
|
||||
lt.apiServer.WaitForNode("some_hostname")
|
||||
|
||||
// add one more offer
|
||||
lt.framework.ResourceOffers(nil, offers)
|
||||
|
||||
// and wait for scheduled pod
|
||||
assert.EventWithReason(lt.eventObs, schedulerloop.Scheduled)
|
||||
select {
|
||||
case launchedTask := <-launchedTasks:
|
||||
// report back that the task has been staged, and then started by mesos
|
||||
lt.framework.StatusUpdate(
|
||||
lt.driver,
|
||||
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING),
|
||||
)
|
||||
|
||||
lt.framework.StatusUpdate(
|
||||
lt.driver,
|
||||
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING),
|
||||
)
|
||||
|
||||
// check that ExecutorInfo.data has the static pod data
|
||||
assert.Len(launchedTask.taskInfo.Executor.Data, 3)
|
||||
|
||||
// report back that the task has been lost
|
||||
lt.driver.AssertNumberOfCalls(t, "SendFrameworkMessage", 0)
|
||||
|
||||
lt.framework.StatusUpdate(
|
||||
lt.driver,
|
||||
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_LOST),
|
||||
)
|
||||
|
||||
// and wait that framework message is sent to executor
|
||||
lt.driver.AssertNumberOfCalls(t, "SendFrameworkMessage", 1)
|
||||
|
||||
case <-time.After(util.ForeverTestTimeout):
|
||||
t.Fatalf("timed out waiting for launchTasks call")
|
||||
}
|
||||
|
||||
offeredNodes := make(map[string]struct{})
|
||||
|
||||
// Launch a pod and wait until the scheduler driver is called
|
||||
schedulePodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
|
||||
// wait for failedScheduling event because there is no offer
|
||||
assert.EventWithReason(lt.eventObs, schedulerloop.FailedScheduling, "failedScheduling event not received")
|
||||
|
||||
// supply a matching offer
|
||||
lt.framework.ResourceOffers(lt.driver, offers)
|
||||
for _, offer := range offers {
|
||||
if _, ok := offeredNodes[offer.GetHostname()]; !ok {
|
||||
offeredNodes[offer.GetHostname()] = struct{}{}
|
||||
lt.apiServer.WaitForNode(offer.GetHostname())
|
||||
|
||||
// reoffer since it must have been declined above
|
||||
lt.framework.ResourceOffers(lt.driver, []*mesos.Offer{offer})
|
||||
}
|
||||
}
|
||||
|
||||
// and wait to get scheduled
|
||||
assert.EventWithReason(lt.eventObs, schedulerloop.Scheduled)
|
||||
|
||||
// wait for driver.launchTasks call
|
||||
select {
|
||||
case launchedTask := <-launchedTasks:
|
||||
for _, offer := range offers {
|
||||
if offer.Id.GetValue() == launchedTask.offerId.GetValue() {
|
||||
return pod, &launchedTask, offer
|
||||
}
|
||||
}
|
||||
t.Fatalf("unknown offer used to start a pod")
|
||||
return nil, nil, nil
|
||||
case <-time.After(util.ForeverTestTimeout):
|
||||
t.Fatal("timed out waiting for launchTasks")
|
||||
return nil, nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Launch a pod and wait until the scheduler driver is called
|
||||
launchPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
|
||||
lt.podsListWatch.Add(pod, true)
|
||||
return schedulePodWithOffers(pod, offers)
|
||||
}
|
||||
|
||||
// Launch a pod, wait until the scheduler driver is called and report back that it is running
|
||||
startPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
|
||||
// notify about pod, offer resources and wait for scheduling
|
||||
pod, launchedTask, offer := launchPodWithOffers(pod, offers)
|
||||
if pod != nil {
|
||||
// report back status
|
||||
lt.framework.StatusUpdate(
|
||||
lt.driver,
|
||||
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING),
|
||||
)
|
||||
lt.framework.StatusUpdate(
|
||||
lt.driver,
|
||||
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING),
|
||||
)
|
||||
|
||||
return pod, launchedTask, offer
|
||||
}
|
||||
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
startTestPod := func() (*api.Pod, *LaunchedTask, *mesos.Offer) {
|
||||
pod, i := NewTestPod()
|
||||
offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))}
|
||||
return startPodWithOffers(pod, offers)
|
||||
}
|
||||
|
||||
// start another pod
|
||||
pod, launchedTask, _ := startTestPod()
|
||||
|
||||
// mock driver.KillTask, should be invoked when a pod is deleted
|
||||
lt.driver.On("KillTask",
|
||||
mock.AnythingOfType("*mesosproto.TaskID"),
|
||||
).Return(mesos.Status_DRIVER_RUNNING, nil).Run(func(args mock.Arguments) {
|
||||
killedTaskId := *(args.Get(0).(*mesos.TaskID))
|
||||
assert.Equal(*launchedTask.taskInfo.TaskId, killedTaskId, "expected same TaskID as during launch")
|
||||
})
|
||||
killTaskCalled := lt.driver.Upon()
|
||||
|
||||
// stop it again via the apiserver mock
|
||||
lt.podsListWatch.Delete(pod, true) // notify watchers
|
||||
|
||||
// and wait for the driver killTask call with the correct TaskId
|
||||
select {
|
||||
case <-killTaskCalled:
|
||||
// report back that the task is finished
|
||||
lt.framework.StatusUpdate(
|
||||
lt.driver,
|
||||
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_FINISHED),
|
||||
)
|
||||
|
||||
case <-time.After(util.ForeverTestTimeout):
|
||||
t.Fatal("timed out waiting for KillTask")
|
||||
}
|
||||
|
||||
// start a pod with on a given NodeName and check that it is scheduled to the right host
|
||||
pod, i = NewTestPod()
|
||||
pod.Spec.NodeName = "hostname1"
|
||||
offers = []*mesos.Offer{}
|
||||
for j := 0; j < 3; j++ {
|
||||
offer := NewTestOffer(fmt.Sprintf("offer%d_%d", i, j))
|
||||
hostname := fmt.Sprintf("hostname%d", j)
|
||||
offer.Hostname = &hostname
|
||||
offers = append(offers, offer)
|
||||
}
|
||||
|
||||
_, _, usedOffer := startPodWithOffers(pod, offers)
|
||||
|
||||
assert.Equal(offers[1].Id.GetValue(), usedOffer.Id.GetValue())
|
||||
assert.Equal(pod.Spec.NodeName, *usedOffer.Hostname)
|
||||
|
||||
lt.framework.OfferRescinded(lt.driver, offers[0].Id)
|
||||
lt.framework.OfferRescinded(lt.driver, offers[2].Id)
|
||||
|
||||
// start pods:
|
||||
// - which are failing while binding,
|
||||
// - leading to reconciliation
|
||||
// - with different states on the apiserver
|
||||
|
||||
failPodFromExecutor := func(task *mesos.TaskInfo) {
|
||||
beforePodLookups := lt.apiServer.Stats(pod.Name)
|
||||
status := newTaskStatusForTask(task, mesos.TaskState_TASK_FAILED)
|
||||
message := messages.CreateBindingFailure
|
||||
status.Message = &message
|
||||
lt.framework.StatusUpdate(lt.driver, status)
|
||||
|
||||
// wait until pod is looked up at the apiserver
|
||||
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
|
||||
return lt.apiServer.Stats(pod.Name) == beforePodLookups+1
|
||||
}, "expect that reconcileTask will access apiserver for pod %v", pod.Name)
|
||||
}
|
||||
|
||||
launchTestPod := func() (*api.Pod, *LaunchedTask, *mesos.Offer) {
|
||||
pod, i := NewTestPod()
|
||||
offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))}
|
||||
return launchPodWithOffers(pod, offers)
|
||||
}
|
||||
|
||||
// 1. with pod deleted from the apiserver
|
||||
// expected: pod is removed from internal task registry
|
||||
pod, launchedTask, _ = launchTestPod()
|
||||
lt.podsListWatch.Delete(pod, false) // not notifying the watchers
|
||||
failPodFromExecutor(launchedTask.taskInfo)
|
||||
|
||||
podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name)
|
||||
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
|
||||
t, _ := lt.sched.Tasks().ForPod(podKey)
|
||||
return t == nil
|
||||
})
|
||||
|
||||
// 2. with pod still on the apiserver, not bound
|
||||
// expected: pod is rescheduled
|
||||
pod, launchedTask, _ = launchTestPod()
|
||||
failPodFromExecutor(launchedTask.taskInfo)
|
||||
|
||||
retryOffers := []*mesos.Offer{NewTestOffer("retry-offer")}
|
||||
schedulePodWithOffers(pod, retryOffers)
|
||||
|
||||
// 3. with pod still on the apiserver, bound, notified via ListWatch
|
||||
// expected: nothing, pod updates not supported, compare ReconcileTask function
|
||||
pod, launchedTask, usedOffer = startTestPod()
|
||||
pod.Annotations = map[string]string{
|
||||
meta.BindingHostKey: *usedOffer.Hostname,
|
||||
}
|
||||
pod.Spec.NodeName = *usedOffer.Hostname
|
||||
lt.podsListWatch.Modify(pod, true) // notifying the watchers
|
||||
time.Sleep(time.Second / 2)
|
||||
failPodFromExecutor(launchedTask.taskInfo)
|
||||
}
|
||||
Reference in New Issue
Block a user