mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #7098 from yifan-gu/handler
kubelet: Refactor runHandler().
This commit is contained in:
		@@ -22,32 +22,52 @@ import (
 | 
			
		||||
	"strconv"
 | 
			
		||||
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type execActionHandler struct {
 | 
			
		||||
	kubelet *Kubelet
 | 
			
		||||
type HandlerRunner interface {
 | 
			
		||||
	Run(containerID string, pod *api.Pod, container *api.Container, handler *api.Handler) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (e *execActionHandler) Run(podFullName string, uid types.UID, container *api.Container, handler *api.Handler) error {
 | 
			
		||||
	_, err := e.kubelet.RunInContainer(podFullName, uid, container.Name, handler.Exec.Command)
 | 
			
		||||
type handlerRunner struct {
 | 
			
		||||
	httpGetter       httpGetter
 | 
			
		||||
	commandRunner    dockertools.ContainerCommandRunner
 | 
			
		||||
	containerManager *dockertools.DockerManager
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO(yifan): Merge commandRunner and containerManager once containerManager implements the ContainerCommandRunner interface.
 | 
			
		||||
func NewHandlerRunner(httpGetter httpGetter, commandRunner dockertools.ContainerCommandRunner, containerManager *dockertools.DockerManager) *handlerRunner {
 | 
			
		||||
	return &handlerRunner{
 | 
			
		||||
		httpGetter:       httpGetter,
 | 
			
		||||
		commandRunner:    commandRunner,
 | 
			
		||||
		containerManager: containerManager,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO(yifan): Use a strong type for containerID.
 | 
			
		||||
func (hr *handlerRunner) Run(containerID string, pod *api.Pod, container *api.Container, handler *api.Handler) error {
 | 
			
		||||
	switch {
 | 
			
		||||
	case handler.Exec != nil:
 | 
			
		||||
		_, err := hr.commandRunner.RunInContainer(containerID, handler.Exec.Command)
 | 
			
		||||
		return err
 | 
			
		||||
	case handler.HTTPGet != nil:
 | 
			
		||||
		return hr.runHTTPHandler(pod, container, handler)
 | 
			
		||||
	default:
 | 
			
		||||
		err := fmt.Errorf("Invalid handler: %v", handler)
 | 
			
		||||
		glog.Errorf("Cannot run handler: %v", err)
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type httpActionHandler struct {
 | 
			
		||||
	kubelet *Kubelet
 | 
			
		||||
	client  httpGetter
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ResolvePort attempts to turn a IntOrString port reference into a concrete port number.
 | 
			
		||||
// resolvePort attempts to turn a IntOrString port reference into a concrete port number.
 | 
			
		||||
// If portReference has an int value, it is treated as a literal, and simply returns that value.
 | 
			
		||||
// If portReference is a string, an attempt is first made to parse it as an integer.  If that fails,
 | 
			
		||||
// an attempt is made to find a port with the same name in the container spec.
 | 
			
		||||
// If a port with the same name is found, it's ContainerPort value is returned.  If no matching
 | 
			
		||||
// port is found, an error is returned.
 | 
			
		||||
func ResolvePort(portReference util.IntOrString, container *api.Container) (int, error) {
 | 
			
		||||
func resolvePort(portReference util.IntOrString, container *api.Container) (int, error) {
 | 
			
		||||
	if portReference.Kind == util.IntstrInt {
 | 
			
		||||
		return portReference.IntVal, nil
 | 
			
		||||
	} else {
 | 
			
		||||
@@ -66,10 +86,10 @@ func ResolvePort(portReference util.IntOrString, container *api.Container) (int,
 | 
			
		||||
	return -1, fmt.Errorf("couldn't find port: %v in %v", portReference, container)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *httpActionHandler) Run(podFullName string, uid types.UID, container *api.Container, handler *api.Handler) error {
 | 
			
		||||
func (hr *handlerRunner) runHTTPHandler(pod *api.Pod, container *api.Container, handler *api.Handler) error {
 | 
			
		||||
	host := handler.HTTPGet.Host
 | 
			
		||||
	if len(host) == 0 {
 | 
			
		||||
		status, err := h.kubelet.GetPodStatus(podFullName)
 | 
			
		||||
		status, err := hr.containerManager.GetPodStatus(pod)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			glog.Errorf("Unable to get pod info, event handlers may be invalid.")
 | 
			
		||||
			return err
 | 
			
		||||
@@ -84,12 +104,12 @@ func (h *httpActionHandler) Run(podFullName string, uid types.UID, container *ap
 | 
			
		||||
		port = 80
 | 
			
		||||
	} else {
 | 
			
		||||
		var err error
 | 
			
		||||
		port, err = ResolvePort(handler.HTTPGet.Port, container)
 | 
			
		||||
		port, err = resolvePort(handler.HTTPGet.Port, container)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	url := fmt.Sprintf("http://%s/%s", net.JoinHostPort(host, strconv.Itoa(port)), handler.HTTPGet.Path)
 | 
			
		||||
	_, err := h.client.Get(url)
 | 
			
		||||
	_, err := hr.httpGetter.Get(url)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -25,7 +25,7 @@ import (
 | 
			
		||||
 | 
			
		||||
func TestResolvePortInt(t *testing.T) {
 | 
			
		||||
	expected := 80
 | 
			
		||||
	port, err := ResolvePort(util.IntOrString{Kind: util.IntstrInt, IntVal: expected}, &api.Container{})
 | 
			
		||||
	port, err := resolvePort(util.IntOrString{Kind: util.IntstrInt, IntVal: expected}, &api.Container{})
 | 
			
		||||
	if port != expected {
 | 
			
		||||
		t.Errorf("expected: %d, saw: %d", expected, port)
 | 
			
		||||
	}
 | 
			
		||||
@@ -42,7 +42,7 @@ func TestResolvePortString(t *testing.T) {
 | 
			
		||||
			{Name: name, ContainerPort: expected},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	port, err := ResolvePort(util.IntOrString{Kind: util.IntstrString, StrVal: name}, container)
 | 
			
		||||
	port, err := resolvePort(util.IntOrString{Kind: util.IntstrString, StrVal: name}, container)
 | 
			
		||||
	if port != expected {
 | 
			
		||||
		t.Errorf("expected: %d, saw: %d", expected, port)
 | 
			
		||||
	}
 | 
			
		||||
@@ -59,7 +59,7 @@ func TestResolvePortStringUnknown(t *testing.T) {
 | 
			
		||||
			{Name: "bar", ContainerPort: expected},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	port, err := ResolvePort(util.IntOrString{Kind: util.IntstrString, StrVal: name}, container)
 | 
			
		||||
	port, err := resolvePort(util.IntOrString{Kind: util.IntstrString, StrVal: name}, container)
 | 
			
		||||
	if port != -1 {
 | 
			
		||||
		t.Errorf("expected: -1, saw: %d", port)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -233,6 +233,7 @@ func NewMainKubelet(
 | 
			
		||||
 | 
			
		||||
	klet.podManager = newBasicPodManager(klet.kubeClient)
 | 
			
		||||
	klet.prober = NewProber(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder)
 | 
			
		||||
	klet.handlerRunner = NewHandlerRunner(klet.httpClient, klet.runner, klet.containerManager)
 | 
			
		||||
 | 
			
		||||
	runtimeCache, err := kubecontainer.NewRuntimeCache(containerManager)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -317,6 +318,10 @@ type Kubelet struct {
 | 
			
		||||
 | 
			
		||||
	// Healthy check prober.
 | 
			
		||||
	prober *Prober
 | 
			
		||||
 | 
			
		||||
	// Container lifecycle handler runner.
 | 
			
		||||
	handlerRunner HandlerRunner
 | 
			
		||||
 | 
			
		||||
	// Container readiness state manager.
 | 
			
		||||
	readinessManager *kubecontainer.ReadinessManager
 | 
			
		||||
 | 
			
		||||
@@ -597,31 +602,6 @@ func makeBinds(container *api.Container, podVolumes volumeMap) []string {
 | 
			
		||||
	return binds
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// A basic interface that knows how to execute handlers
 | 
			
		||||
type actionHandler interface {
 | 
			
		||||
	Run(podFullName string, uid types.UID, container *api.Container, handler *api.Handler) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (kl *Kubelet) newActionHandler(handler *api.Handler) actionHandler {
 | 
			
		||||
	switch {
 | 
			
		||||
	case handler.Exec != nil:
 | 
			
		||||
		return &execActionHandler{kubelet: kl}
 | 
			
		||||
	case handler.HTTPGet != nil:
 | 
			
		||||
		return &httpActionHandler{client: kl.httpClient, kubelet: kl}
 | 
			
		||||
	default:
 | 
			
		||||
		glog.Errorf("Invalid handler: %v", handler)
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (kl *Kubelet) runHandler(podFullName string, uid types.UID, container *api.Container, handler *api.Handler) error {
 | 
			
		||||
	actionHandler := kl.newActionHandler(handler)
 | 
			
		||||
	if actionHandler == nil {
 | 
			
		||||
		return fmt.Errorf("invalid handler")
 | 
			
		||||
	}
 | 
			
		||||
	return actionHandler.Run(podFullName, uid, container, handler)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// generateRunContainerOptions generates the RunContainerOptions, which can be used by
 | 
			
		||||
// the container runtime to set parameters for launching a container.
 | 
			
		||||
func (kl *Kubelet) generateRunContainerOptions(pod *api.Pod, container *api.Container, podVolumes volumeMap, netMode, ipcMode string) (*kubecontainer.RunContainerOptions, error) {
 | 
			
		||||
@@ -677,7 +657,7 @@ func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, podVolum
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
 | 
			
		||||
		handlerErr := kl.runHandler(kubecontainer.GetPodFullName(pod), pod.UID, container, container.Lifecycle.PostStart)
 | 
			
		||||
		handlerErr := kl.handlerRunner.Run(id, pod, container, container.Lifecycle.PostStart)
 | 
			
		||||
		if handlerErr != nil {
 | 
			
		||||
			kl.killContainerByID(id)
 | 
			
		||||
			return dockertools.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
 | 
			
		||||
 
 | 
			
		||||
@@ -114,6 +114,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
 | 
			
		||||
		fakeRecorder)
 | 
			
		||||
	kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{}
 | 
			
		||||
	kubelet.prober = NewProber(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder)
 | 
			
		||||
	kubelet.handlerRunner = NewHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, kubelet.containerManager)
 | 
			
		||||
 | 
			
		||||
	return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
 | 
			
		||||
}
 | 
			
		||||
@@ -767,6 +768,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
 | 
			
		||||
	waitGroup := testKubelet.waitGroup
 | 
			
		||||
	fakeHttp := fakeHTTP{}
 | 
			
		||||
	kubelet.httpClient = &fakeHttp
 | 
			
		||||
	kubelet.handlerRunner = NewHandlerRunner(kubelet.httpClient, &fakeContainerCommandRunner{}, kubelet.containerManager)
 | 
			
		||||
	pods := []*api.Pod{
 | 
			
		||||
		{
 | 
			
		||||
			ObjectMeta: api.ObjectMeta{
 | 
			
		||||
@@ -1692,6 +1694,7 @@ func TestRunHandlerExec(t *testing.T) {
 | 
			
		||||
	kubelet := testKubelet.kubelet
 | 
			
		||||
	fakeDocker := testKubelet.fakeDocker
 | 
			
		||||
	kubelet.runner = &fakeCommandRunner
 | 
			
		||||
	kubelet.handlerRunner = NewHandlerRunner(&fakeHTTP{}, kubelet.runner, kubelet.containerManager)
 | 
			
		||||
 | 
			
		||||
	containerID := "abc1234"
 | 
			
		||||
	podName := "podFoo"
 | 
			
		||||
@@ -1715,7 +1718,12 @@ func TestRunHandlerExec(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	err := kubelet.runHandler(podName+"_"+podNamespace, "", &container, container.Lifecycle.PostStart)
 | 
			
		||||
 | 
			
		||||
	pod := api.Pod{}
 | 
			
		||||
	pod.ObjectMeta.Name = podName
 | 
			
		||||
	pod.ObjectMeta.Namespace = podNamespace
 | 
			
		||||
	pod.Spec.Containers = []api.Container{container}
 | 
			
		||||
	err := kubelet.handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -1741,7 +1749,9 @@ func TestRunHandlerHttp(t *testing.T) {
 | 
			
		||||
	testKubelet := newTestKubelet(t)
 | 
			
		||||
	kubelet := testKubelet.kubelet
 | 
			
		||||
	kubelet.httpClient = &fakeHttp
 | 
			
		||||
	kubelet.handlerRunner = NewHandlerRunner(kubelet.httpClient, &fakeContainerCommandRunner{}, kubelet.containerManager)
 | 
			
		||||
 | 
			
		||||
	containerID := "abc1234"
 | 
			
		||||
	podName := "podFoo"
 | 
			
		||||
	podNamespace := "nsFoo"
 | 
			
		||||
	containerName := "containerFoo"
 | 
			
		||||
@@ -1758,7 +1768,12 @@ func TestRunHandlerHttp(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	err := kubelet.runHandler(podName+"_"+podNamespace, "", &container, container.Lifecycle.PostStart)
 | 
			
		||||
	pod := api.Pod{}
 | 
			
		||||
	pod.ObjectMeta.Name = podName
 | 
			
		||||
	pod.ObjectMeta.Namespace = podNamespace
 | 
			
		||||
	pod.Spec.Containers = []api.Container{container}
 | 
			
		||||
	err := kubelet.handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -1767,35 +1782,28 @@ func TestRunHandlerHttp(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestNewHandler(t *testing.T) {
 | 
			
		||||
func TestRunHandlerNil(t *testing.T) {
 | 
			
		||||
	testKubelet := newTestKubelet(t)
 | 
			
		||||
	kubelet := testKubelet.kubelet
 | 
			
		||||
	handler := &api.Handler{
 | 
			
		||||
		HTTPGet: &api.HTTPGetAction{
 | 
			
		||||
			Host: "foo",
 | 
			
		||||
			Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt},
 | 
			
		||||
			Path: "bar",
 | 
			
		||||
 | 
			
		||||
	containerID := "abc1234"
 | 
			
		||||
	podName := "podFoo"
 | 
			
		||||
	podNamespace := "nsFoo"
 | 
			
		||||
	containerName := "containerFoo"
 | 
			
		||||
 | 
			
		||||
	container := api.Container{
 | 
			
		||||
		Name: containerName,
 | 
			
		||||
		Lifecycle: &api.Lifecycle{
 | 
			
		||||
			PostStart: &api.Handler{},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	actionHandler := kubelet.newActionHandler(handler)
 | 
			
		||||
	if actionHandler == nil {
 | 
			
		||||
		t.Error("unexpected nil action handler.")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	handler = &api.Handler{
 | 
			
		||||
		Exec: &api.ExecAction{
 | 
			
		||||
			Command: []string{"ls", "-l"},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	actionHandler = kubelet.newActionHandler(handler)
 | 
			
		||||
	if actionHandler == nil {
 | 
			
		||||
		t.Error("unexpected nil action handler.")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	handler = &api.Handler{}
 | 
			
		||||
	actionHandler = kubelet.newActionHandler(handler)
 | 
			
		||||
	if actionHandler != nil {
 | 
			
		||||
		t.Errorf("unexpected non-nil action handler: %v", actionHandler)
 | 
			
		||||
	pod := api.Pod{}
 | 
			
		||||
	pod.ObjectMeta.Name = podName
 | 
			
		||||
	pod.ObjectMeta.Namespace = podNamespace
 | 
			
		||||
	pod.Spec.Containers = []api.Container{container}
 | 
			
		||||
	err := kubelet.handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		t.Errorf("expect error, but got nil")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -1809,6 +1817,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
 | 
			
		||||
	kubelet.httpClient = &fakeHTTP{
 | 
			
		||||
		err: fmt.Errorf("test error"),
 | 
			
		||||
	}
 | 
			
		||||
	kubelet.handlerRunner = NewHandlerRunner(kubelet.httpClient, &fakeContainerCommandRunner{}, kubelet.containerManager)
 | 
			
		||||
 | 
			
		||||
	pods := []*api.Pod{
 | 
			
		||||
		{
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user