mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 02:08:13 +00:00 
			
		
		
		
	kubelet: plumb context for log requests
This allows kubelets to stop the necessary work when the context has been canceled (e.g., connection closed), and not leaking a goroutine and inotify watcher waiting indefinitely.
This commit is contained in:
		| @@ -17,6 +17,7 @@ limitations under the License. | ||||
| package container | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"net/url" | ||||
| @@ -113,7 +114,7 @@ type Runtime interface { | ||||
| 	// default, it returns a snapshot of the container log. Set 'follow' to true to | ||||
| 	// stream the log. Set 'follow' to false and specify the number of lines (e.g. | ||||
| 	// "100" or "all") to tail the log. | ||||
| 	GetContainerLogs(pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) | ||||
| 	GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) | ||||
| 	// Delete a container. If the container is still running, an error is returned. | ||||
| 	DeleteContainer(containerID ContainerID) error | ||||
| 	// ImageService provides methods to image-related methods. | ||||
|   | ||||
| @@ -17,6 +17,7 @@ limitations under the License. | ||||
| package testing | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"net/url" | ||||
| @@ -289,7 +290,7 @@ func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodS | ||||
| 	return &status, f.Err | ||||
| } | ||||
|  | ||||
| func (f *FakeRuntime) GetContainerLogs(pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) { | ||||
| func (f *FakeRuntime) GetContainerLogs(_ context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) { | ||||
| 	f.Lock() | ||||
| 	defer f.Unlock() | ||||
|  | ||||
|   | ||||
| @@ -17,6 +17,7 @@ limitations under the License. | ||||
| package testing | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"io" | ||||
| 	"time" | ||||
|  | ||||
| @@ -100,7 +101,7 @@ func (r *Mock) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, | ||||
| 	return args.Error(0) | ||||
| } | ||||
|  | ||||
| func (r *Mock) GetContainerLogs(pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) { | ||||
| func (r *Mock) GetContainerLogs(_ context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) { | ||||
| 	args := r.Called(pod, containerID, logOptions, stdout, stderr) | ||||
| 	return args.Error(0) | ||||
| } | ||||
|   | ||||
| @@ -17,6 +17,7 @@ limitations under the License. | ||||
| package dockershim | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"strconv" | ||||
| @@ -39,7 +40,7 @@ import ( | ||||
| // more functions. | ||||
| type DockerLegacyService interface { | ||||
| 	// GetContainerLogs gets logs for a specific container. | ||||
| 	GetContainerLogs(*v1.Pod, kubecontainer.ContainerID, *v1.PodLogOptions, io.Writer, io.Writer) error | ||||
| 	GetContainerLogs(context.Context, *v1.Pod, kubecontainer.ContainerID, *v1.PodLogOptions, io.Writer, io.Writer) error | ||||
|  | ||||
| 	// IsCRISupportedLogDriver checks whether the logging driver used by docker is | ||||
| 	// supported by native CRI integration. | ||||
| @@ -50,7 +51,7 @@ type DockerLegacyService interface { | ||||
| } | ||||
|  | ||||
| // GetContainerLogs get container logs directly from docker daemon. | ||||
| func (d *dockerService) GetContainerLogs(pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { | ||||
| func (d *dockerService) GetContainerLogs(_ context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { | ||||
| 	container, err := d.client.InspectContainer(containerID.ID) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| @@ -97,7 +98,7 @@ func (d *dockerService) GetContainerLogTail(uid kubetypes.UID, name, namespace s | ||||
| 			Namespace: namespace, | ||||
| 		}, | ||||
| 	} | ||||
| 	err := d.GetContainerLogs(pod, containerId, &v1.PodLogOptions{TailLines: &value}, buf, buf) | ||||
| 	err := d.GetContainerLogs(context.Background(), pod, containerId, &v1.PodLogOptions{TailLines: &value}, buf, buf) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
|   | ||||
| @@ -18,6 +18,7 @@ package kubelet | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"io/ioutil" | ||||
| @@ -1159,7 +1160,7 @@ func (kl *Kubelet) validateContainerLogStatus(podName string, podStatus *v1.PodS | ||||
| // GetKubeletContainerLogs returns logs from the container | ||||
| // TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt | ||||
| // or all of them. | ||||
| func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { | ||||
| func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { | ||||
| 	// Pod workers periodically write status to statusManager. If status is not | ||||
| 	// cached there, something is wrong (or kubelet just restarted and hasn't | ||||
| 	// caught up yet). Just assume the pod is not ready yet. | ||||
| @@ -1205,9 +1206,9 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName string, lo | ||||
| 		// dockerLegacyService should only be non-nil when we actually need it, so | ||||
| 		// inject it into the runtimeService. | ||||
| 		// TODO(random-liu): Remove this hack after deprecating unsupported log driver. | ||||
| 		return kl.dockerLegacyService.GetContainerLogs(pod, containerID, logOptions, stdout, stderr) | ||||
| 		return kl.dockerLegacyService.GetContainerLogs(ctx, pod, containerID, logOptions, stdout, stderr) | ||||
| 	} | ||||
| 	return kl.containerRuntime.GetContainerLogs(pod, containerID, logOptions, stdout, stderr) | ||||
| 	return kl.containerRuntime.GetContainerLogs(ctx, pod, containerID, logOptions, stdout, stderr) | ||||
| } | ||||
|  | ||||
| // getPhase returns the phase of a pod given its container info. | ||||
|   | ||||
| @@ -17,6 +17,7 @@ limitations under the License. | ||||
| package kuberuntime | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| @@ -366,7 +367,7 @@ func getTerminationMessage(status *runtimeapi.ContainerStatus, terminationMessag | ||||
| func (m *kubeGenericRuntimeManager) readLastStringFromContainerLogs(path string) string { | ||||
| 	value := int64(kubecontainer.MaxContainerTerminationMessageLogLines) | ||||
| 	buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength) | ||||
| 	if err := m.ReadLogs(path, "", &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil { | ||||
| 	if err := m.ReadLogs(context.Background(), path, "", &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil { | ||||
| 		return fmt.Sprintf("Error on reading termination message from logs: %v", err) | ||||
| 	} | ||||
| 	return buf.String() | ||||
| @@ -730,13 +731,13 @@ func findNextInitContainerToRun(pod *v1.Pod, podStatus *kubecontainer.PodStatus) | ||||
| } | ||||
|  | ||||
| // GetContainerLogs returns logs of a specific container. | ||||
| func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) { | ||||
| func (m *kubeGenericRuntimeManager) GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) { | ||||
| 	status, err := m.runtimeService.ContainerStatus(containerID.ID) | ||||
| 	if err != nil { | ||||
| 		glog.V(4).Infof("failed to get container status for %v: %v", containerID.String(), err) | ||||
| 		return fmt.Errorf("Unable to retrieve container logs for %v", containerID.String()) | ||||
| 	} | ||||
| 	return m.ReadLogs(status.GetLogPath(), containerID.ID, logOptions, stdout, stderr) | ||||
| 	return m.ReadLogs(ctx, status.GetLogPath(), containerID.ID, logOptions, stdout, stderr) | ||||
| } | ||||
|  | ||||
| // GetExec gets the endpoint the runtime will serve the exec request from. | ||||
|   | ||||
| @@ -17,6 +17,7 @@ limitations under the License. | ||||
| package kuberuntime | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"io" | ||||
| 	"time" | ||||
|  | ||||
| @@ -27,9 +28,9 @@ import ( | ||||
| // ReadLogs read the container log and redirect into stdout and stderr. | ||||
| // Note that containerID is only needed when following the log, or else | ||||
| // just pass in empty string "". | ||||
| func (m *kubeGenericRuntimeManager) ReadLogs(path, containerID string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error { | ||||
| func (m *kubeGenericRuntimeManager) ReadLogs(ctx context.Context, path, containerID string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error { | ||||
| 	// Convert v1.PodLogOptions into internal log options. | ||||
| 	opts := logs.NewLogOptions(apiOpts, time.Now()) | ||||
|  | ||||
| 	return logs.ReadLogs(path, containerID, opts, m.runtimeService, stdout, stderr) | ||||
| 	return logs.ReadLogs(ctx, path, containerID, opts, m.runtimeService, stdout, stderr) | ||||
| } | ||||
|   | ||||
| @@ -19,6 +19,7 @@ package logs | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| @@ -266,7 +267,7 @@ func (w *logWriter) write(msg *logMessage) error { | ||||
| // ReadLogs read the container log and redirect into stdout and stderr. | ||||
| // Note that containerID is only needed when following the log, or else | ||||
| // just pass in empty string "". | ||||
| func ReadLogs(path, containerID string, opts *LogOptions, runtimeService internalapi.RuntimeService, stdout, stderr io.Writer) error { | ||||
| func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, runtimeService internalapi.RuntimeService, stdout, stderr io.Writer) error { | ||||
| 	f, err := os.Open(path) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to open log file %q: %v", path, err) | ||||
| @@ -317,7 +318,7 @@ func ReadLogs(path, containerID string, opts *LogOptions, runtimeService interna | ||||
| 					} | ||||
| 				} | ||||
| 				// Wait until the next log change. | ||||
| 				if found, err := waitLogs(containerID, watcher, runtimeService); !found { | ||||
| 				if found, err := waitLogs(ctx, containerID, watcher, runtimeService); !found { | ||||
| 					return err | ||||
| 				} | ||||
| 				continue | ||||
| @@ -371,7 +372,7 @@ func isContainerRunning(id string, r internalapi.RuntimeService) (bool, error) { | ||||
|  | ||||
| // waitLogs wait for the next log write. It returns a boolean and an error. The boolean | ||||
| // indicates whether a new log is found; the error is error happens during waiting new logs. | ||||
| func waitLogs(id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, error) { | ||||
| func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, error) { | ||||
| 	// no need to wait if the pod is not running | ||||
| 	if running, err := isContainerRunning(id, runtimeService); !running { | ||||
| 		return false, err | ||||
| @@ -379,6 +380,8 @@ func waitLogs(id string, w *fsnotify.Watcher, runtimeService internalapi.Runtime | ||||
| 	errRetry := 5 | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return false, fmt.Errorf("context cancelled") | ||||
| 		case e := <-w.Events: | ||||
| 			switch e.Op { | ||||
| 			case fsnotify.Write: | ||||
|   | ||||
| @@ -17,6 +17,7 @@ limitations under the License. | ||||
| package server | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"crypto/tls" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| @@ -172,7 +173,7 @@ type HostInterface interface { | ||||
| 	GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) | ||||
| 	GetRunningPods() ([]*v1.Pod, error) | ||||
| 	RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) | ||||
| 	GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error | ||||
| 	GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error | ||||
| 	ServeLogs(w http.ResponseWriter, req *http.Request) | ||||
| 	ResyncInterval() time.Duration | ||||
| 	GetHostname() string | ||||
| @@ -457,6 +458,7 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re | ||||
| 	podNamespace := request.PathParameter("podNamespace") | ||||
| 	podID := request.PathParameter("podID") | ||||
| 	containerName := request.PathParameter("containerName") | ||||
| 	ctx := request.Request.Context() | ||||
|  | ||||
| 	if len(podID) == 0 { | ||||
| 		// TODO: Why return JSON when the rest return plaintext errors? | ||||
| @@ -528,7 +530,7 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re | ||||
| 	} | ||||
| 	fw := flushwriter.Wrap(response.ResponseWriter) | ||||
| 	response.Header().Set("Transfer-Encoding", "chunked") | ||||
| 	if err := s.host.GetKubeletContainerLogs(kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil { | ||||
| 	if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil { | ||||
| 		response.WriteError(http.StatusBadRequest, err) | ||||
| 		return | ||||
| 	} | ||||
|   | ||||
| @@ -18,6 +18,7 @@ package server | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| @@ -80,7 +81,7 @@ type fakeKubelet struct { | ||||
| 	getAttachCheck      func(string, types.UID, string, remotecommandserver.Options) | ||||
| 	getPortForwardCheck func(string, string, types.UID, portforward.V4Options) | ||||
|  | ||||
| 	containerLogsFunc func(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error | ||||
| 	containerLogsFunc func(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error | ||||
| 	hostnameFunc      func() string | ||||
| 	resyncInterval    time.Duration | ||||
| 	loopEntryTime     time.Time | ||||
| @@ -128,8 +129,8 @@ func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) { | ||||
| 	fk.logFunc(w, req) | ||||
| } | ||||
|  | ||||
| func (fk *fakeKubelet) GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { | ||||
| 	return fk.containerLogsFunc(podFullName, containerName, logOptions, stdout, stderr) | ||||
| func (fk *fakeKubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { | ||||
| 	return fk.containerLogsFunc(ctx, podFullName, containerName, logOptions, stdout, stderr) | ||||
| } | ||||
|  | ||||
| func (fk *fakeKubelet) GetHostname() string { | ||||
| @@ -983,7 +984,7 @@ func setPodByNameFunc(fw *serverTestFramework, namespace, pod, container string) | ||||
| } | ||||
|  | ||||
| func setGetContainerLogsFunc(fw *serverTestFramework, t *testing.T, expectedPodName, expectedContainerName string, expectedLogOptions *v1.PodLogOptions, output string) { | ||||
| 	fw.fakeKubelet.containerLogsFunc = func(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { | ||||
| 	fw.fakeKubelet.containerLogsFunc = func(_ context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { | ||||
| 		if podFullName != expectedPodName { | ||||
| 			t.Errorf("expected %s, got %s", expectedPodName, podFullName) | ||||
| 		} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Yu-Ju Hong
					Yu-Ju Hong