Changes to kubelet to support win containers

This commit is contained in:
Alexander Brand
2016-08-24 10:49:06 -04:00
committed by Paulo Pires
parent 09285864db
commit 244152544c
8 changed files with 134 additions and 422 deletions

View File

@@ -115,10 +115,6 @@ var (
defaultSeccompOpt = []dockerOpt{{"seccomp", "unconfined", ""}}
)
type WindowsDockerManager struct {
*DockerManager
}
type DockerManager struct {
client DockerInterface
recorder record.EventRecorder
@@ -206,71 +202,6 @@ func PodInfraContainerEnv(env map[string]string) kubecontainer.Option {
}
}
type windowsRuntimeHelper struct {
kubecontainer.RuntimeHelper
}
func (h *windowsRuntimeHelper) GenerateRunContainerOptions(pod *api.Pod, container *api.Container, podIP string) (*kubecontainer.RunContainerOptions, error) {
return &kubecontainer.RunContainerOptions{
Hostname: "test-pod",
}, nil
}
func NewWindowsDockerManager(
client DockerInterface,
recorder record.EventRecorder,
livenessManager proberesults.Manager,
containerRefManager *kubecontainer.RefManager,
podGetter podGetter,
machineInfo *cadvisorapi.MachineInfo,
podInfraContainerImage string,
qps float32,
burst int,
containerLogsDir string,
osInterface kubecontainer.OSInterface,
networkPlugin network.NetworkPlugin,
runtimeHelper kubecontainer.RuntimeHelper,
httpClient types.HttpGetter,
execHandler ExecHandler,
oomAdjuster *oom.OOMAdjuster,
procFs procfs.ProcFSInterface,
cpuCFSQuota bool,
imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool,
enableCustomMetrics bool,
hairpinMode bool,
seccompProfileRoot string,
options ...kubecontainer.Option) *WindowsDockerManager {
dockerManager := NewDockerManager(client,
recorder,
livenessManager,
containerRefManager,
podGetter,
machineInfo,
podInfraContainerImage,
qps,
burst,
containerLogsDir,
osInterface,
networkPlugin,
runtimeHelper,
httpClient,
execHandler,
oomAdjuster,
procFs,
cpuCFSQuota,
imageBackOff,
serializeImagePulls,
enableCustomMetrics,
hairpinMode,
seccompProfileRoot,
options...)
return &WindowsDockerManager{
DockerManager: dockerManager,
}
}
func NewDockerManager(
client DockerInterface,
recorder record.EventRecorder,
@@ -410,19 +341,7 @@ var (
// that the container passed is the infrastructure container of a pod and the responsibility
// of the caller to ensure that the correct container is passed.
func (dm *DockerManager) determineContainerIP(podNamespace, podName string, container *dockertypes.ContainerJSON) (string, error) {
result := ""
if container.NetworkSettings != nil {
result = container.NetworkSettings.IPAddress
if len(result) == 0 {
for _, network := range container.NetworkSettings.Networks {
if len(network.IPAddress) > 0 {
result = network.IPAddress
break
}
}
}
}
result := getContainerIP(container)
networkMode := getDockerNetworkMode(container)
isHostNetwork := networkMode == namespaceModeHost
@@ -707,7 +626,6 @@ func (dm *DockerManager) runContainer(
cpuShares = milliCPUToShares(cpuRequest.MilliValue())
}
var devices []dockercontainer.DeviceMapping
_ = devices
if nvidiaGPULimit.Value() != 0 {
// Experimental. For now, we hardcode /dev/nvidia0 no matter what the user asks for
// (we only support one device per node).
@@ -741,23 +659,20 @@ func (dm *DockerManager) runContainer(
}
}
_ = cpuShares
_ = memoryLimit
_ = securityOpts
hc := &dockercontainer.HostConfig{
// Binds: binds,
NetworkMode: dockercontainer.NetworkMode(netMode),
// IpcMode: dockercontainer.IpcMode(ipcMode),
// UTSMode: dockercontainer.UTSMode(utsMode),
// PidMode: dockercontainer.PidMode(pidMode),
// ReadonlyRootfs: readOnlyRootFilesystem(container),
// Resources: dockercontainer.Resources{
// Memory: memoryLimit,
// MemorySwap: -1,
// CPUShares: cpuShares,
// Devices: devices,
// },
// SecurityOpt: securityOpts,
Binds: binds,
NetworkMode: dockercontainer.NetworkMode(netMode),
IpcMode: dockercontainer.IpcMode(ipcMode),
UTSMode: dockercontainer.UTSMode(utsMode),
PidMode: dockercontainer.PidMode(pidMode),
ReadonlyRootfs: readOnlyRootFilesystem(container),
Resources: dockercontainer.Resources{
Memory: memoryLimit,
MemorySwap: -1,
CPUShares: cpuShares,
Devices: devices,
},
SecurityOpt: fmtSecurityOpts,
}
// Set sysctls if requested
@@ -820,10 +735,10 @@ func (dm *DockerManager) runContainer(
glog.V(3).Infof("Container %v/%v/%v: setting entrypoint \"%v\" and command \"%v\"", pod.Namespace, pod.Name, container.Name, dockerOpts.Config.Entrypoint, dockerOpts.Config.Cmd)
// supplementalGids := dm.runtimeHelper.GetExtraSupplementalGroupsForPod(pod)
// securityContextProvider := securitycontext.NewSimpleSecurityContextProvider()
// securityContextProvider.ModifyContainerConfig(pod, container, dockerOpts.Config)
// securityContextProvider.ModifyHostConfig(pod, container, dockerOpts.HostConfig, supplementalGids)
supplementalGids := dm.runtimeHelper.GetExtraSupplementalGroupsForPod(pod)
securityContextProvider := securitycontext.NewSimpleSecurityContextProvider()
securityContextProvider.ModifyContainerConfig(pod, container, dockerOpts.Config)
securityContextProvider.ModifyHostConfig(pod, container, dockerOpts.HostConfig, supplementalGids)
createResp, err := dm.client.CreateContainer(dockerOpts)
if err != nil {
dm.recorder.Eventf(ref, api.EventTypeWarning, events.FailedToCreateContainer, "Failed to create docker container %q of pod %q with error: %v", container.Name, format.Pod(pod), err)
@@ -999,10 +914,6 @@ func (dm *DockerManager) PullImage(image kubecontainer.ImageSpec, secrets []api.
return dm.dockerPuller.Pull(image.Image, secrets)
}
func (dm *WindowsDockerManager) PullImage(image kubecontainer.ImageSpec, secrets []api.Secret) error {
return fmt.Errorf("Image pull not yet supported")
}
// IsImagePresent checks whether the container image is already in the local storage.
func (dm *DockerManager) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) {
return dm.dockerPuller.IsImagePresent(image.Image)
@@ -1821,9 +1732,9 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
// Check if current docker version is higher than 1.10. Otherwise, we have to apply OOMScoreAdj instead of using docker API.
// TODO: Remove this logic after we stop supporting docker version < 1.10.
// if err = dm.applyOOMScoreAdjIfNeeded(pod, container, containerInfo); err != nil {
// return kubecontainer.ContainerID{}, err
// }
if err = dm.applyOOMScoreAdjIfNeeded(pod, container, containerInfo); err != nil {
return kubecontainer.ContainerID{}, err
}
// The addNDotsOption call appends the ndots option to the resolv.conf file generated by docker.
// This resolv.conf file is shared by all containers of the same pod, and needs to be modified only once per pod.
@@ -2148,116 +2059,6 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, podStatus *kub
}, nil
}
func (dm *WindowsDockerManager) computePodContainerChanges(pod *api.Pod, podStatus *kubecontainer.PodStatus) (podContainerChangesSpec, error) {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("computePodContainerChanges").Observe(metrics.SinceInMicroseconds(start))
}()
glog.V(5).Infof("Syncing Pod %q: %+v", format.Pod(pod), pod)
containersToStart := make(map[int]string)
containersToKeep := make(map[kubecontainer.DockerID]int)
// var err error
// Skip podInfraContainer checking and creation
// var podInfraContainerID kubecontainer.DockerID
// var changed bool
// podInfraContainerStatus := podStatus.FindContainerStatusByName(PodInfraContainerName)
// if podInfraContainerStatus != nil && podInfraContainerStatus.State == kubecontainer.ContainerStateRunning {
// glog.V(4).Infof("Found pod infra container for %q", format.Pod(pod))
// changed, err = dm.podInfraContainerChanged(pod, podInfraContainerStatus)
// if err != nil {
// return podContainerChangesSpec{}, err
// }
// }
// createPodInfraContainer := true
// if podInfraContainerStatus == nil || podInfraContainerStatus.State != kubecontainer.ContainerStateRunning {
// glog.V(2).Infof("Need to restart pod infra container for %q because it is not found", format.Pod(pod))
// } else if changed {
// glog.V(2).Infof("Need to restart pod infra container for %q because it is changed", format.Pod(pod))
// } else {
// glog.V(4).Infof("Pod infra container looks good, keep it %q", format.Pod(pod))
// createPodInfraContainer = false
// podInfraContainerID = kubecontainer.DockerID(podInfraContainerStatus.ID.ID)
// containersToKeep[podInfraContainerID] = -1
// }
for index, container := range pod.Spec.Containers {
expectedHash := kubecontainer.HashContainer(&container)
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
// If we are here it means that the container is dead and should be restarted, or never existed and should
// be created. We may be inserting this ID again if the container has changed and it has
// RestartPolicy::Always, but it's not a big deal.
message := fmt.Sprintf("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
glog.V(3).Info(message)
containersToStart[index] = message
}
continue
}
containerID := kubecontainer.DockerID(containerStatus.ID.ID)
hash := containerStatus.Hash
glog.V(3).Infof("pod %q container %q exists as %v", format.Pod(pod), container.Name, containerID)
// if createPodInfraContainer {
// // createPodInfraContainer == true and Container exists
// // If we're creating infra container everything will be killed anyway
// // If RestartPolicy is Always or OnFailure we restart containers that were running before we
// // killed them when restarting Infra Container.
// if pod.Spec.RestartPolicy != api.RestartPolicyNever {
// message := fmt.Sprintf("Infra Container is being recreated. %q will be restarted.", container.Name)
// glog.V(1).Info(message)
// containersToStart[index] = message
// }
// continue
// }
// At this point, the container is running and pod infra container is good.
// We will look for changes and check healthiness for the container.
containerChanged := hash != 0 && hash != expectedHash
if containerChanged {
message := fmt.Sprintf("pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", format.Pod(pod), container.Name, hash, expectedHash)
glog.Info(message)
containersToStart[index] = message
continue
}
liveness, found := dm.livenessManager.Get(containerStatus.ID)
if !found || liveness == proberesults.Success {
containersToKeep[containerID] = index
continue
}
if pod.Spec.RestartPolicy != api.RestartPolicyNever {
message := fmt.Sprintf("pod %q container %q is unhealthy, it will be killed and re-created.", format.Pod(pod), container.Name)
glog.Info(message)
containersToStart[index] = message
}
}
// After the loop one of the following should be true:
// - createPodInfraContainer is true and containersToKeep is empty.
// (In fact, when createPodInfraContainer is false, containersToKeep will not be touched).
// - createPodInfraContainer is false and containersToKeep contains at least ID of Infra Container
// If Infra container is the last running one, we don't want to keep it.
// if !createPodInfraContainer && len(containersToStart) == 0 && len(containersToKeep) == 1 {
// containersToKeep = make(map[kubecontainer.DockerID]int)
// }
return podContainerChangesSpec{
StartInfraContainer: false,
InfraChanged: false,
InfraContainerId: "",
ContainersToStart: containersToStart,
ContainersToKeep: containersToKeep,
}, nil
}
// Sync the running pod to match the specified desired pod.
func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
start := time.Now()
@@ -2509,7 +2310,14 @@ func (dm *DockerManager) tryContainerStart(container *api.Container, pod *api.Po
restartCount = containerStatus.RestartCount + 1
}
_, err = dm.runContainerInPod(pod, container, namespaceMode, namespaceMode, pidMode, podIP, restartCount)
// Allow override of networking mode for specific platforms (e.g. Windows)
netMode := getNetworkingMode()
if netMode == "" {
// If not overriden, use the namespace mode
netMode = namespaceMode
}
_, err = dm.runContainerInPod(pod, container, netMode, namespaceMode, pidMode, podIP, restartCount)
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
return kubecontainer.ErrRunContainer, err.Error()
@@ -2600,196 +2408,6 @@ func findActiveInitContainer(pod *api.Pod, podStatus *kubecontainer.PodStatus) (
return &pod.Spec.InitContainers[0], nil, false
}
// Sync the running pod to match the specified desired pod.
func (dm *WindowsDockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("SyncPod").Observe(metrics.SinceInMicroseconds(start))
}()
containerChanges, err := dm.computePodContainerChanges(pod, podStatus)
if err != nil {
result.Fail(err)
return
}
glog.V(3).Infof("Got container changes for pod %q: %+v", format.Pod(pod), containerChanges)
// if containerChanges.InfraChanged {
// ref, err := api.GetReference(pod)
// if err != nil {
// glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
// }
// dm.recorder.Eventf(ref, api.EventTypeNormal, "InfraChanged", "Pod infrastructure changed, it will be killed and re-created.")
// }
// if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) {
// if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 {
// glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", format.Pod(pod))
// } else {
// glog.V(4).Infof("Killing Infra Container for %q, will start new one", format.Pod(pod))
// }
// // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
// // TODO(random-liu): We'll use pod status directly in the future
// killResult := dm.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(podStatus))
// result.AddPodSyncResult(killResult)
// if killResult.Error() != nil {
// return
// }
// } else {
// Otherwise kill any running containers in this pod which are not specified as ones to keep.
runningContainerStatues := podStatus.GetRunningContainerStatuses()
for _, containerStatus := range runningContainerStatues {
_, keep := containerChanges.ContainersToKeep[kubecontainer.DockerID(containerStatus.ID.ID)]
if !keep {
glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerStatus.Name, containerStatus.ID, format.Pod(pod))
// attempt to find the appropriate container policy
var podContainer *api.Container
var killMessage string
for i, c := range pod.Spec.Containers {
if c.Name == containerStatus.Name {
podContainer = &pod.Spec.Containers[i]
killMessage = containerChanges.ContainersToStart[i]
break
}
}
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerStatus.Name)
result.AddSyncResult(killContainerResult)
if err := dm.KillContainerInPod(containerStatus.ID, podContainer, pod, killMessage, nil); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
glog.Errorf("Error killing container %q(id=%q) for pod %q: %v", containerStatus.Name, containerStatus.ID, format.Pod(pod), err)
return
}
}
}
// }
// We pass the value of the podIP down to runContainerInPod, which in turn
// passes it to various other functions, in order to facilitate
// functionality that requires this value (hosts file and downward API)
// and avoid races determining the pod IP in cases where a container
// requires restart but the podIP isn't in the status manager yet.
//
// We default to the IP in the passed-in pod status, and overwrite it if the
// infra container needs to be (re)started.
podIP := ""
if podStatus != nil {
podIP = podStatus.IP
}
// If we should create infra container then we do it first.
// podInfraContainerID := containerChanges.InfraContainerId
// if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) {
// glog.V(4).Infof("Creating pod infra container for %q", format.Pod(pod))
// startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, PodInfraContainerName)
// result.AddSyncResult(startContainerResult)
// var msg string
// podInfraContainerID, err, msg = dm.createPodInfraContainer(pod)
// if err != nil {
// startContainerResult.Fail(err, msg)
// glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, format.Pod(pod))
// return
// }
// setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, kubecontainer.GetPodFullName(pod))
// result.AddSyncResult(setupNetworkResult)
// if !usesHostNetwork(pod) {
// // Call the networking plugin
// err = dm.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID)
// if err != nil {
// // TODO: (random-liu) There shouldn't be "Skipping pod" in sync result message
// message := fmt.Sprintf("Failed to setup network for pod %q using network plugins %q: %v; Skipping pod", format.Pod(pod), dm.networkPlugin.Name(), err)
// setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, message)
// glog.Error(message)
// // Delete infra container
// killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, PodInfraContainerName)
// result.AddSyncResult(killContainerResult)
// if delErr := dm.KillContainerInPod(kubecontainer.ContainerID{
// ID: string(podInfraContainerID),
// Type: "docker"}, nil, pod, message); delErr != nil {
// killContainerResult.Fail(kubecontainer.ErrKillContainer, delErr.Error())
// glog.Warningf("Clear infra container failed for pod %q: %v", format.Pod(pod), delErr)
// }
// return
// }
// // Setup the host interface unless the pod is on the host's network (FIXME: move to networkPlugin when ready)
// var podInfraContainer *docker.Container
// podInfraContainer, err = dm.client.InspectContainer(string(podInfraContainerID))
// if err != nil {
// glog.Errorf("Failed to inspect pod infra container: %v; Skipping pod %q", err, format.Pod(pod))
// result.Fail(err)
// return
// }
// if dm.configureHairpinMode {
// if err = hairpin.SetUpContainer(podInfraContainer.State.Pid, network.DefaultInterfaceName); err != nil {
// glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err)
// }
// }
// // Overwrite the podIP passed in the pod status, since we just started the infra container.
// podIP = dm.determineContainerIP(pod.Name, pod.Namespace, podInfraContainer)
// }
// }
// Start everything
for idx := range containerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult)
// containerChanges.StartInfraContainer causes the containers to be restarted for config reasons
// ignore backoff
if !containerChanges.StartInfraContainer {
isInBackOff, err, msg := dm.doBackOff(pod, container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
continue
}
}
glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
// err, msg := dm.imagePuller.PullImage(pod, container, pullSecrets)
// if err != nil {
// startContainerResult.Fail(err, msg)
// continue
// }
// if container.SecurityContext != nil && container.SecurityContext.RunAsNonRoot != nil && *container.SecurityContext.RunAsNonRoot {
// err := dm.verifyNonRoot(container)
// if err != nil {
// startContainerResult.Fail(kubecontainer.ErrVerifyNonRoot, err.Error())
// glog.Errorf("Error running pod %q container %q: %v", format.Pod(pod), container.Name, err)
// continue
// }
// }
// For a new container, the RestartCount should be 0
restartCount := 0
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus != nil {
restartCount = containerStatus.RestartCount + 1
}
// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container
// Note: when configuring the pod's containers anything that can be configured by pointing
// to the namespace of the infra container should use namespaceMode. This includes things like the net namespace
// and IPC namespace. PID mode cannot point to another container right now.
// See createPodInfraContainer for infra container setup.
// namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID)
namespaceMode := ""
_, err = dm.runContainerInPod(pod, container, namespaceMode, namespaceMode, getPidMode(pod), podIP, restartCount)
if err != nil {
startContainerResult.Fail(kubecontainer.ErrRunContainer, err.Error())
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running pod %q container %q: %v", format.Pod(pod), container.Name, err)
continue
}
// Successfully started the container; clear the entry in the failure
}
return
}
// verifyNonRoot returns an error if the container or image will run as the root user.
func (dm *DockerManager) verifyNonRoot(container *api.Container) error {
if securitycontext.HasRunAsUser(container) {
@@ -2996,7 +2614,7 @@ func (dm *DockerManager) GetPodStatus(uid kubetypes.UID, name, namespace string)
}
}
containerStatuses = append(containerStatuses, result)
if ip != "" {
if containerProvidesPodIP(dockerName) && ip != "" {
podStatus.IP = ip
}
}