mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #12919 from gmarek/use_api_ports
Auto commit by PR queue bot
This commit is contained in:
		@@ -46,6 +46,7 @@ CLUSTER_IP_RANGE="${CLUSTER_IP_RANGE:-10.244.0.0/16}"
 | 
				
			|||||||
MINION_SCOPES="${MINION_SCOPES:-compute-rw,monitoring,logging-write,storage-ro}"
 | 
					MINION_SCOPES="${MINION_SCOPES:-compute-rw,monitoring,logging-write,storage-ro}"
 | 
				
			||||||
RUNTIME_CONFIG="${KUBE_RUNTIME_CONFIG:-}"
 | 
					RUNTIME_CONFIG="${KUBE_RUNTIME_CONFIG:-}"
 | 
				
			||||||
ENABLE_EXPERIMENTAL_API="${KUBE_ENABLE_EXPERIMENTAL_API:-false}"
 | 
					ENABLE_EXPERIMENTAL_API="${KUBE_ENABLE_EXPERIMENTAL_API:-false}"
 | 
				
			||||||
 | 
					KUBELET_PORT="${KUBELET_PORT:-10250}"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default.
 | 
					# Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default.
 | 
				
			||||||
POLL_SLEEP_INTERVAL="${POLL_SLEEP_INTERVAL:-3}"
 | 
					POLL_SLEEP_INTERVAL="${POLL_SLEEP_INTERVAL:-3}"
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -48,6 +48,7 @@ MINION_SCOPES="${MINION_SCOPES:-compute-rw,monitoring,logging-write,storage-ro}"
 | 
				
			|||||||
RUNTIME_CONFIG="${KUBE_RUNTIME_CONFIG:-}"
 | 
					RUNTIME_CONFIG="${KUBE_RUNTIME_CONFIG:-}"
 | 
				
			||||||
ENABLE_EXPERIMENTAL_API="${KUBE_ENABLE_EXPERIMENTAL_API:-true}"
 | 
					ENABLE_EXPERIMENTAL_API="${KUBE_ENABLE_EXPERIMENTAL_API:-true}"
 | 
				
			||||||
TERMINATED_POD_GC_THRESHOLD=${TERMINATED_POD_GC_THRESHOLD:-100}
 | 
					TERMINATED_POD_GC_THRESHOLD=${TERMINATED_POD_GC_THRESHOLD:-100}
 | 
				
			||||||
 | 
					KUBELET_PORT="${KUBELET_PORT:-10250}"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default.
 | 
					# Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default.
 | 
				
			||||||
POLL_SLEEP_INTERVAL=3
 | 
					POLL_SLEEP_INTERVAL=3
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -284,6 +284,7 @@ opencontrail_public_subnet: '$(echo "$OPENCONTRAIL_PUBLIC_SUBNET")'
 | 
				
			|||||||
enable_manifest_url: '$(echo "$ENABLE_MANIFEST_URL" | sed -e "s/'/''/g")'
 | 
					enable_manifest_url: '$(echo "$ENABLE_MANIFEST_URL" | sed -e "s/'/''/g")'
 | 
				
			||||||
manifest_url: '$(echo "$MANIFEST_URL" | sed -e "s/'/''/g")'
 | 
					manifest_url: '$(echo "$MANIFEST_URL" | sed -e "s/'/''/g")'
 | 
				
			||||||
manifest_url_header: '$(echo "$MANIFEST_URL_HEADER" | sed -e "s/'/''/g")'
 | 
					manifest_url_header: '$(echo "$MANIFEST_URL_HEADER" | sed -e "s/'/''/g")'
 | 
				
			||||||
 | 
					kubelet_port: '$(echo "$KUBELET_PORT")'
 | 
				
			||||||
EOF
 | 
					EOF
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if [ -n "${APISERVER_TEST_ARGS:-}" ]; then
 | 
					    if [ -n "${APISERVER_TEST_ARGS:-}" ]; then
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -60,6 +60,7 @@ NETWORK_PROVIDER: $(yaml-quote ${NETWORK_PROVIDER:-})
 | 
				
			|||||||
OPENCONTRAIL_TAG: $(yaml-quote ${OPENCONTRAIL_TAG:-})
 | 
					OPENCONTRAIL_TAG: $(yaml-quote ${OPENCONTRAIL_TAG:-})
 | 
				
			||||||
OPENCONTRAIL_KUBERNETES_TAG: $(yaml-quote ${OPENCONTRAIL_KUBERNETES_TAG:-})
 | 
					OPENCONTRAIL_KUBERNETES_TAG: $(yaml-quote ${OPENCONTRAIL_KUBERNETES_TAG:-})
 | 
				
			||||||
OPENCONTRAIL_PUBLIC_SUBNET: $(yaml-quote ${OPENCONTRAIL_PUBLIC_SUBNET:-})
 | 
					OPENCONTRAIL_PUBLIC_SUBNET: $(yaml-quote ${OPENCONTRAIL_PUBLIC_SUBNET:-})
 | 
				
			||||||
 | 
					KUBELET_PORT: $(yaml-quote ${KUBELET_PORT})
 | 
				
			||||||
EOF
 | 
					EOF
 | 
				
			||||||
  if [ -n "${KUBE_APISERVER_REQUEST_TIMEOUT:-}" ]; then
 | 
					  if [ -n "${KUBE_APISERVER_REQUEST_TIMEOUT:-}" ]; then
 | 
				
			||||||
    cat >>$file <<EOF
 | 
					    cat >>$file <<EOF
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -30,7 +30,9 @@ source "${KUBE_ROOT}/cluster/gce/debian/helper.sh"
 | 
				
			|||||||
# $1: template name (required)
 | 
					# $1: template name (required)
 | 
				
			||||||
function create-node-instance-template {
 | 
					function create-node-instance-template {
 | 
				
			||||||
  local template_name="$1"
 | 
					  local template_name="$1"
 | 
				
			||||||
 | 
					  sed "s/##KUBELET_PORT##/${KUBELET_PORT}/g" ${KUBE_ROOT}/cluster/gce/trusty/node_template.yaml > ${KUBE_ROOT}/cluster/gce/trusty/node.yaml
 | 
				
			||||||
  create-node-template "$template_name" "${scope_flags[*]}" \
 | 
					  create-node-template "$template_name" "${scope_flags[*]}" \
 | 
				
			||||||
		"kube-env=${KUBE_TEMP}/node-kube-env.yaml" \
 | 
							"kube-env=${KUBE_TEMP}/node-kube-env.yaml" \
 | 
				
			||||||
    "user-data=${KUBE_ROOT}/cluster/gce/trusty/node.yaml"
 | 
					    "user-data=${KUBE_ROOT}/cluster/gce/trusty/node.yaml"
 | 
				
			||||||
 | 
					  rm ${KUBE_ROOT}/cluster/gce/trusty/node.yaml
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -403,7 +403,7 @@ script
 | 
				
			|||||||
			echo "Docker daemon failed!"
 | 
								echo "Docker daemon failed!"
 | 
				
			||||||
			pkill docker
 | 
								pkill docker
 | 
				
			||||||
		fi
 | 
							fi
 | 
				
			||||||
		if ! curl --insecure -m ${max_seconds} -f -s https://127.0.0.1:10250/healthz > /dev/null; then
 | 
							if ! curl --insecure -m ${max_seconds} -f -s https://127.0.0.1:##KUBELET_PORT##/healthz > /dev/null; then
 | 
				
			||||||
			echo "Kubelet is unhealthy!"
 | 
								echo "Kubelet is unhealthy!"
 | 
				
			||||||
			pkill kubelet
 | 
								pkill kubelet
 | 
				
			||||||
		fi
 | 
							fi
 | 
				
			||||||
@@ -116,5 +116,10 @@
 | 
				
			|||||||
  {% set network_plugin = "--network-plugin=opencontrail" %}
 | 
					  {% set network_plugin = "--network-plugin=opencontrail" %}
 | 
				
			||||||
{% endif -%}
 | 
					{% endif -%}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					{% set kubelet_port = "--port=10250" -%}
 | 
				
			||||||
 | 
					{% if pillar['kubelet_port'] is defined -%}
 | 
				
			||||||
 | 
					  {% set kubelet_port="--port=" + pillar['kubelet_port'] %}
 | 
				
			||||||
 | 
					{% endif -%}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# test_args has to be kept at the end, so they'll overwrite any prior configuration
 | 
					# test_args has to be kept at the end, so they'll overwrite any prior configuration
 | 
				
			||||||
DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{debugging_handlers}} {{hostname_override}} {{cloud_provider}} {{config}} {{manifest_url}} --allow-privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{kubelet_root}} {{configure_cbr0}} {{cgroup_root}} {{system_container}} {{pod_cidr}} {{ master_kubelet_args }} {{cpu_cfs_quota}} {{network_plugin}} {{test_args}}"
 | 
					DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{debugging_handlers}} {{hostname_override}} {{cloud_provider}} {{config}} {{manifest_url}} --allow-privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{kubelet_root}} {{configure_cbr0}} {{cgroup_root}} {{system_container}} {{pod_cidr}} {{ master_kubelet_args }} {{cpu_cfs_quota}} {{network_plugin}} {{kubelet_port}} {{test_args}}"
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -240,6 +240,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
 | 
				
			|||||||
	// Kubelet related flags:
 | 
						// Kubelet related flags:
 | 
				
			||||||
	fs.BoolVar(&s.KubeletConfig.EnableHttps, "kubelet-https", s.KubeletConfig.EnableHttps, "Use https for kubelet connections")
 | 
						fs.BoolVar(&s.KubeletConfig.EnableHttps, "kubelet-https", s.KubeletConfig.EnableHttps, "Use https for kubelet connections")
 | 
				
			||||||
	fs.UintVar(&s.KubeletConfig.Port, "kubelet-port", s.KubeletConfig.Port, "Kubelet port")
 | 
						fs.UintVar(&s.KubeletConfig.Port, "kubelet-port", s.KubeletConfig.Port, "Kubelet port")
 | 
				
			||||||
 | 
						fs.MarkDeprecated("kubelet-port", "kubelet-port is deprecated and will be removed")
 | 
				
			||||||
	fs.DurationVar(&s.KubeletConfig.HTTPTimeout, "kubelet-timeout", s.KubeletConfig.HTTPTimeout, "Timeout for kubelet operations")
 | 
						fs.DurationVar(&s.KubeletConfig.HTTPTimeout, "kubelet-timeout", s.KubeletConfig.HTTPTimeout, "Timeout for kubelet operations")
 | 
				
			||||||
	fs.StringVar(&s.KubeletConfig.CertFile, "kubelet-client-certificate", s.KubeletConfig.CertFile, "Path to a client cert file for TLS.")
 | 
						fs.StringVar(&s.KubeletConfig.CertFile, "kubelet-client-certificate", s.KubeletConfig.CertFile, "Path to a client cert file for TLS.")
 | 
				
			||||||
	fs.StringVar(&s.KubeletConfig.KeyFile, "kubelet-client-key", s.KubeletConfig.KeyFile, "Path to a client key file for TLS.")
 | 
						fs.StringVar(&s.KubeletConfig.KeyFile, "kubelet-client-key", s.KubeletConfig.KeyFile, "Path to a client key file for TLS.")
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -40,8 +40,7 @@ func TestHTTPKubeletClient(t *testing.T) {
 | 
				
			|||||||
	testServer := httptest.NewServer(&fakeHandler)
 | 
						testServer := httptest.NewServer(&fakeHandler)
 | 
				
			||||||
	defer testServer.Close()
 | 
						defer testServer.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	_, err = url.Parse(testServer.URL)
 | 
						if _, err := url.Parse(testServer.URL); err != nil {
 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		t.Errorf("unexpected error: %v", err)
 | 
							t.Errorf("unexpected error: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -538,7 +538,6 @@ func (m *Master) init(c *Config) {
 | 
				
			|||||||
	healthzChecks := []healthz.HealthzChecker{}
 | 
						healthzChecks := []healthz.HealthzChecker{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
 | 
						dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
 | 
				
			||||||
	podStorage := podetcd.NewStorage(dbClient("pods"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"))
 | 
						podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -560,6 +559,14 @@ func (m *Master) init(c *Config) {
 | 
				
			|||||||
	nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)
 | 
						nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)
 | 
				
			||||||
	m.nodeRegistry = node.NewRegistry(nodeStorage)
 | 
						m.nodeRegistry = node.NewRegistry(nodeStorage)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						podStorage := podetcd.NewStorage(
 | 
				
			||||||
 | 
							dbClient("pods"),
 | 
				
			||||||
 | 
							nodeStorage,
 | 
				
			||||||
 | 
							c.EnableWatchCache,
 | 
				
			||||||
 | 
							c.KubeletClient,
 | 
				
			||||||
 | 
							m.proxyTransport,
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	serviceStorage := serviceetcd.NewREST(dbClient("services"))
 | 
						serviceStorage := serviceetcd.NewREST(dbClient("services"))
 | 
				
			||||||
	m.serviceRegistry = service.NewRegistry(serviceStorage)
 | 
						m.serviceRegistry = service.NewRegistry(serviceStorage)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,6 +17,7 @@ limitations under the License.
 | 
				
			|||||||
package etcd
 | 
					package etcd
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"net/url"
 | 
						"net/url"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -102,3 +103,25 @@ var _ = rest.Redirector(&REST{})
 | 
				
			|||||||
func (r *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
 | 
					func (r *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
 | 
				
			||||||
	return node.ResourceLocation(r, r.connection, r.proxyTransport, ctx, id)
 | 
						return node.ResourceLocation(r, r.connection, r.proxyTransport, ctx, id)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// An interface for types that allow getting information about a Node on which give pod is running.
 | 
				
			||||||
 | 
					type HostLocator interface {
 | 
				
			||||||
 | 
						HostKubeletPort(pod *api.Pod, ctx api.Context) (int, error)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var _ = HostLocator(&REST{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *REST) HostKubeletPort(pod *api.Pod, ctx api.Context) (int, error) {
 | 
				
			||||||
 | 
						obj, err := r.Get(ctx, pod.Spec.NodeName)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return 0, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						node := obj.(*api.Node)
 | 
				
			||||||
 | 
						if node == nil {
 | 
				
			||||||
 | 
							return 0, fmt.Errorf("Unexpected object type: %#v", node)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if node.Status.DaemonEndpoints.KubeletEndpoint.Port == 0 {
 | 
				
			||||||
 | 
							return -1, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return node.Status.DaemonEndpoints.KubeletEndpoint.Port, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -153,17 +153,30 @@ func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGet
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	host := hostIP.String()
 | 
						host := hostIP.String()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if portReq == "" || strconv.Itoa(ports.KubeletPort) == portReq {
 | 
						// We check if we want to get a default Kubelet's transport. It happens if either:
 | 
				
			||||||
		// Ignore requested scheme, use scheme provided by GetConnectionInfo
 | 
						// - no port is specified in request (Kubelet's port is default),
 | 
				
			||||||
 | 
						// - we're using Port stored as a DaemonEndpoint and requested port is a Kubelet's port stored in the DaemonEndpoint,
 | 
				
			||||||
 | 
						// - there's no information in the API about DaemonEnpoint (legacy cluster) and requested port is equal to ports.KubeletPort (cluster-wide config)
 | 
				
			||||||
 | 
						defaultKubeletPort := node.Status.DaemonEndpoints.KubeletEndpoint.Port
 | 
				
			||||||
 | 
						if defaultKubeletPort == 0 {
 | 
				
			||||||
 | 
							defaultKubeletPort = ports.KubeletPort
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if portReq == "" || strconv.Itoa(defaultKubeletPort) == portReq {
 | 
				
			||||||
		scheme, port, kubeletTransport, err := connection.GetConnectionInfo(host)
 | 
							scheme, port, kubeletTransport, err := connection.GetConnectionInfo(host)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return nil, nil, err
 | 
								return nil, nil, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							var portString string
 | 
				
			||||||
 | 
							if node.Status.DaemonEndpoints.KubeletEndpoint.Port != 0 {
 | 
				
			||||||
 | 
								portString = strconv.Itoa(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								portString = strconv.FormatUint(uint64(port), 10)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		return &url.URL{
 | 
							return &url.URL{
 | 
				
			||||||
				Scheme: scheme,
 | 
									Scheme: scheme,
 | 
				
			||||||
				Host: net.JoinHostPort(
 | 
									Host: net.JoinHostPort(
 | 
				
			||||||
					host,
 | 
										host,
 | 
				
			||||||
					strconv.FormatUint(uint64(port), 10),
 | 
										portString,
 | 
				
			||||||
				),
 | 
									),
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
			kubeletTransport,
 | 
								kubeletTransport,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -59,8 +59,19 @@ type REST struct {
 | 
				
			|||||||
	proxyTransport http.RoundTripper
 | 
						proxyTransport http.RoundTripper
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Defined in pkg/registry/node/etcd/etcd.go
 | 
				
			||||||
 | 
					type HostLocator interface {
 | 
				
			||||||
 | 
						HostKubeletPort(pod *api.Pod, ctx api.Context) (int, error)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewStorage returns a RESTStorage object that will work against pods.
 | 
					// NewStorage returns a RESTStorage object that will work against pods.
 | 
				
			||||||
func NewStorage(s storage.Interface, useCacher bool, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
 | 
					func NewStorage(
 | 
				
			||||||
 | 
						s storage.Interface,
 | 
				
			||||||
 | 
						hostLocator HostLocator,
 | 
				
			||||||
 | 
						useCacher bool,
 | 
				
			||||||
 | 
						k client.ConnectionInfoGetter,
 | 
				
			||||||
 | 
						proxyTransport http.RoundTripper,
 | 
				
			||||||
 | 
					) PodStorage {
 | 
				
			||||||
	prefix := "/pods"
 | 
						prefix := "/pods"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	storageInterface := s
 | 
						storageInterface := s
 | 
				
			||||||
@@ -110,11 +121,11 @@ func NewStorage(s storage.Interface, useCacher bool, k client.ConnectionInfoGett
 | 
				
			|||||||
		Pod:         &REST{store, proxyTransport},
 | 
							Pod:         &REST{store, proxyTransport},
 | 
				
			||||||
		Binding:     &BindingREST{store: store},
 | 
							Binding:     &BindingREST{store: store},
 | 
				
			||||||
		Status:      &StatusREST{store: &statusStore},
 | 
							Status:      &StatusREST{store: &statusStore},
 | 
				
			||||||
		Log:         &podrest.LogREST{Store: store, KubeletConn: k},
 | 
							Log:         &podrest.LogREST{Store: store, HostLocator: hostLocator, KubeletConn: k},
 | 
				
			||||||
		Proxy:       &ProxyREST{store: store, proxyTransport: proxyTransport},
 | 
							Proxy:       &ProxyREST{store: store, proxyTransport: proxyTransport},
 | 
				
			||||||
		Exec:        &ExecREST{store: store, kubeletConn: k},
 | 
							Exec:        &ExecREST{store: store, hostLocator: hostLocator, kubeletConn: k},
 | 
				
			||||||
		Attach:      &AttachREST{store: store, kubeletConn: k},
 | 
							Attach:      &AttachREST{store: store, hostLocator: hostLocator, kubeletConn: k},
 | 
				
			||||||
		PortForward: &PortForwardREST{store: store, kubeletConn: k},
 | 
							PortForward: &PortForwardREST{store: store, hostLocator: hostLocator, kubeletConn: k},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -262,6 +273,7 @@ var upgradeableMethods = []string{"GET", "POST"}
 | 
				
			|||||||
type AttachREST struct {
 | 
					type AttachREST struct {
 | 
				
			||||||
	store       *etcdgeneric.Etcd
 | 
						store       *etcdgeneric.Etcd
 | 
				
			||||||
	kubeletConn client.ConnectionInfoGetter
 | 
						kubeletConn client.ConnectionInfoGetter
 | 
				
			||||||
 | 
						hostLocator HostLocator
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Implement Connecter
 | 
					// Implement Connecter
 | 
				
			||||||
@@ -278,10 +290,14 @@ func (r *AttachREST) Connect(ctx api.Context, name string, opts runtime.Object,
 | 
				
			|||||||
	if !ok {
 | 
						if !ok {
 | 
				
			||||||
		return nil, fmt.Errorf("Invalid options object: %#v", opts)
 | 
							return nil, fmt.Errorf("Invalid options object: %#v", opts)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	location, transport, err := pod.AttachLocation(r.store, r.kubeletConn, ctx, name, attachOpts)
 | 
						location, transport, err := pod.AttachLocation(r.store, r.kubeletConn, ctx, name, attachOpts, r.hostLocator)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if location.Host == "" {
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("Empty location.Host in %#v", location)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
 | 
						return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -300,6 +316,7 @@ func (r *AttachREST) ConnectMethods() []string {
 | 
				
			|||||||
type ExecREST struct {
 | 
					type ExecREST struct {
 | 
				
			||||||
	store       *etcdgeneric.Etcd
 | 
						store       *etcdgeneric.Etcd
 | 
				
			||||||
	kubeletConn client.ConnectionInfoGetter
 | 
						kubeletConn client.ConnectionInfoGetter
 | 
				
			||||||
 | 
						hostLocator HostLocator
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Implement Connecter
 | 
					// Implement Connecter
 | 
				
			||||||
@@ -316,10 +333,14 @@ func (r *ExecREST) Connect(ctx api.Context, name string, opts runtime.Object, re
 | 
				
			|||||||
	if !ok {
 | 
						if !ok {
 | 
				
			||||||
		return nil, fmt.Errorf("invalid options object: %#v", opts)
 | 
							return nil, fmt.Errorf("invalid options object: %#v", opts)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	location, transport, err := pod.ExecLocation(r.store, r.kubeletConn, ctx, name, execOpts)
 | 
						location, transport, err := pod.ExecLocation(r.store, r.kubeletConn, ctx, name, execOpts, r.hostLocator)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if location.Host == "" {
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("Empty location.Host in %#v", location)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
 | 
						return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -337,6 +358,7 @@ func (r *ExecREST) ConnectMethods() []string {
 | 
				
			|||||||
// TODO: move me into pod/rest - I'm generic to store type via ResourceGetter
 | 
					// TODO: move me into pod/rest - I'm generic to store type via ResourceGetter
 | 
				
			||||||
type PortForwardREST struct {
 | 
					type PortForwardREST struct {
 | 
				
			||||||
	store       *etcdgeneric.Etcd
 | 
						store       *etcdgeneric.Etcd
 | 
				
			||||||
 | 
						hostLocator HostLocator
 | 
				
			||||||
	kubeletConn client.ConnectionInfoGetter
 | 
						kubeletConn client.ConnectionInfoGetter
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -360,10 +382,14 @@ func (r *PortForwardREST) ConnectMethods() []string {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// Connect returns a handler for the pod portforward proxy
 | 
					// Connect returns a handler for the pod portforward proxy
 | 
				
			||||||
func (r *PortForwardREST) Connect(ctx api.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
 | 
					func (r *PortForwardREST) Connect(ctx api.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
 | 
				
			||||||
	location, transport, err := pod.PortForwardLocation(r.store, r.kubeletConn, ctx, name)
 | 
						location, transport, err := pod.PortForwardLocation(r.store, r.kubeletConn, ctx, name, r.hostLocator)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if location.Host == "" {
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("Empty location.Host in %#v", location)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
 | 
						return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -28,6 +28,7 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/pkg/api/testapi"
 | 
						"k8s.io/kubernetes/pkg/api/testapi"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/fields"
 | 
						"k8s.io/kubernetes/pkg/fields"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/labels"
 | 
						"k8s.io/kubernetes/pkg/labels"
 | 
				
			||||||
 | 
						nodeetcd "k8s.io/kubernetes/pkg/registry/node/etcd"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/registry/registrytest"
 | 
						"k8s.io/kubernetes/pkg/registry/registrytest"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/runtime"
 | 
						"k8s.io/kubernetes/pkg/runtime"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/securitycontext"
 | 
						"k8s.io/kubernetes/pkg/securitycontext"
 | 
				
			||||||
@@ -36,10 +37,11 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/pkg/util"
 | 
						"k8s.io/kubernetes/pkg/util"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient) {
 | 
					func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient, PodStorage) {
 | 
				
			||||||
	etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
 | 
						etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
 | 
				
			||||||
	storage := NewStorage(etcdStorage, false, nil, nil)
 | 
						nodeREST, _ := nodeetcd.NewREST(etcdStorage, false, nil, nil)
 | 
				
			||||||
	return storage.Pod, storage.Binding, storage.Status, fakeClient
 | 
						storage := NewStorage(etcdStorage, nodeREST, false, nil, nil)
 | 
				
			||||||
 | 
						return storage.Pod, storage.Binding, storage.Status, fakeClient, storage
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func validNewPod() *api.Pod {
 | 
					func validNewPod() *api.Pod {
 | 
				
			||||||
@@ -79,7 +81,7 @@ func validChangedPod() *api.Pod {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestCreate(t *testing.T) {
 | 
					func TestCreate(t *testing.T) {
 | 
				
			||||||
	storage, _, _, fakeClient := newStorage(t)
 | 
						storage, _, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	test := registrytest.New(t, fakeClient, storage.Etcd)
 | 
						test := registrytest.New(t, fakeClient, storage.Etcd)
 | 
				
			||||||
	pod := validNewPod()
 | 
						pod := validNewPod()
 | 
				
			||||||
	pod.ObjectMeta = api.ObjectMeta{}
 | 
						pod.ObjectMeta = api.ObjectMeta{}
 | 
				
			||||||
@@ -104,7 +106,7 @@ func TestCreate(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestUpdate(t *testing.T) {
 | 
					func TestUpdate(t *testing.T) {
 | 
				
			||||||
	storage, _, _, fakeClient := newStorage(t)
 | 
						storage, _, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	test := registrytest.New(t, fakeClient, storage.Etcd)
 | 
						test := registrytest.New(t, fakeClient, storage.Etcd)
 | 
				
			||||||
	test.TestUpdate(
 | 
						test.TestUpdate(
 | 
				
			||||||
		// valid
 | 
							// valid
 | 
				
			||||||
@@ -119,7 +121,7 @@ func TestUpdate(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestDelete(t *testing.T) {
 | 
					func TestDelete(t *testing.T) {
 | 
				
			||||||
	storage, _, _, fakeClient := newStorage(t)
 | 
						storage, _, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	test := registrytest.New(t, fakeClient, storage.Etcd).ReturnDeletedObject()
 | 
						test := registrytest.New(t, fakeClient, storage.Etcd).ReturnDeletedObject()
 | 
				
			||||||
	test.TestDelete(validNewPod())
 | 
						test.TestDelete(validNewPod())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -129,7 +131,7 @@ func TestDelete(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestCreateRegistryError(t *testing.T) {
 | 
					func TestCreateRegistryError(t *testing.T) {
 | 
				
			||||||
	storage, _, _, fakeClient := newStorage(t)
 | 
						storage, _, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	fakeClient.Err = fmt.Errorf("test error")
 | 
						fakeClient.Err = fmt.Errorf("test error")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pod := validNewPod()
 | 
						pod := validNewPod()
 | 
				
			||||||
@@ -140,7 +142,7 @@ func TestCreateRegistryError(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestCreateSetsFields(t *testing.T) {
 | 
					func TestCreateSetsFields(t *testing.T) {
 | 
				
			||||||
	storage, _, _, fakeClient := newStorage(t)
 | 
						storage, _, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	pod := validNewPod()
 | 
						pod := validNewPod()
 | 
				
			||||||
	_, err := storage.Create(api.NewDefaultContext(), pod)
 | 
						_, err := storage.Create(api.NewDefaultContext(), pod)
 | 
				
			||||||
	if err != fakeClient.Err {
 | 
						if err != fakeClient.Err {
 | 
				
			||||||
@@ -254,7 +256,7 @@ func TestResourceLocation(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	ctx := api.NewDefaultContext()
 | 
						ctx := api.NewDefaultContext()
 | 
				
			||||||
	for _, tc := range testCases {
 | 
						for _, tc := range testCases {
 | 
				
			||||||
		storage, _, _, fakeClient := newStorage(t)
 | 
							storage, _, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
		key, _ := storage.KeyFunc(ctx, tc.pod.Name)
 | 
							key, _ := storage.KeyFunc(ctx, tc.pod.Name)
 | 
				
			||||||
		key = etcdtest.AddPrefix(key)
 | 
							key = etcdtest.AddPrefix(key)
 | 
				
			||||||
		if _, err := fakeClient.Set(key, runtime.EncodeOrDie(testapi.Default.Codec(), &tc.pod), 0); err != nil {
 | 
							if _, err := fakeClient.Set(key, runtime.EncodeOrDie(testapi.Default.Codec(), &tc.pod), 0); err != nil {
 | 
				
			||||||
@@ -280,19 +282,19 @@ func TestResourceLocation(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestGet(t *testing.T) {
 | 
					func TestGet(t *testing.T) {
 | 
				
			||||||
	storage, _, _, fakeClient := newStorage(t)
 | 
						storage, _, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	test := registrytest.New(t, fakeClient, storage.Etcd)
 | 
						test := registrytest.New(t, fakeClient, storage.Etcd)
 | 
				
			||||||
	test.TestGet(validNewPod())
 | 
						test.TestGet(validNewPod())
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestList(t *testing.T) {
 | 
					func TestList(t *testing.T) {
 | 
				
			||||||
	storage, _, _, fakeClient := newStorage(t)
 | 
						storage, _, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	test := registrytest.New(t, fakeClient, storage.Etcd)
 | 
						test := registrytest.New(t, fakeClient, storage.Etcd)
 | 
				
			||||||
	test.TestList(validNewPod())
 | 
						test.TestList(validNewPod())
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestWatch(t *testing.T) {
 | 
					func TestWatch(t *testing.T) {
 | 
				
			||||||
	storage, _, _, fakeClient := newStorage(t)
 | 
						storage, _, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	test := registrytest.New(t, fakeClient, storage.Etcd)
 | 
						test := registrytest.New(t, fakeClient, storage.Etcd)
 | 
				
			||||||
	test.TestWatch(
 | 
						test.TestWatch(
 | 
				
			||||||
		validNewPod(),
 | 
							validNewPod(),
 | 
				
			||||||
@@ -314,7 +316,7 @@ func TestWatch(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestEtcdCreate(t *testing.T) {
 | 
					func TestEtcdCreate(t *testing.T) {
 | 
				
			||||||
	storage, bindingStorage, _, fakeClient := newStorage(t)
 | 
						storage, bindingStorage, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	ctx := api.NewDefaultContext()
 | 
						ctx := api.NewDefaultContext()
 | 
				
			||||||
	fakeClient.TestIndex = true
 | 
						fakeClient.TestIndex = true
 | 
				
			||||||
	key, _ := storage.KeyFunc(ctx, "foo")
 | 
						key, _ := storage.KeyFunc(ctx, "foo")
 | 
				
			||||||
@@ -352,7 +354,7 @@ func TestEtcdCreate(t *testing.T) {
 | 
				
			|||||||
// Ensure that when scheduler creates a binding for a pod that has already been deleted
 | 
					// Ensure that when scheduler creates a binding for a pod that has already been deleted
 | 
				
			||||||
// by the API server, API server returns not-found error.
 | 
					// by the API server, API server returns not-found error.
 | 
				
			||||||
func TestEtcdCreateBindingNoPod(t *testing.T) {
 | 
					func TestEtcdCreateBindingNoPod(t *testing.T) {
 | 
				
			||||||
	storage, bindingStorage, _, fakeClient := newStorage(t)
 | 
						storage, bindingStorage, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	ctx := api.NewDefaultContext()
 | 
						ctx := api.NewDefaultContext()
 | 
				
			||||||
	fakeClient.TestIndex = true
 | 
						fakeClient.TestIndex = true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -384,7 +386,7 @@ func TestEtcdCreateBindingNoPod(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestEtcdCreateFailsWithoutNamespace(t *testing.T) {
 | 
					func TestEtcdCreateFailsWithoutNamespace(t *testing.T) {
 | 
				
			||||||
	storage, _, _, fakeClient := newStorage(t)
 | 
						storage, _, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	fakeClient.TestIndex = true
 | 
						fakeClient.TestIndex = true
 | 
				
			||||||
	pod := validNewPod()
 | 
						pod := validNewPod()
 | 
				
			||||||
	pod.Namespace = ""
 | 
						pod.Namespace = ""
 | 
				
			||||||
@@ -396,7 +398,7 @@ func TestEtcdCreateFailsWithoutNamespace(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestEtcdCreateWithContainersNotFound(t *testing.T) {
 | 
					func TestEtcdCreateWithContainersNotFound(t *testing.T) {
 | 
				
			||||||
	storage, bindingStorage, _, fakeClient := newStorage(t)
 | 
						storage, bindingStorage, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	ctx := api.NewDefaultContext()
 | 
						ctx := api.NewDefaultContext()
 | 
				
			||||||
	fakeClient.TestIndex = true
 | 
						fakeClient.TestIndex = true
 | 
				
			||||||
	key, _ := storage.KeyFunc(ctx, "foo")
 | 
						key, _ := storage.KeyFunc(ctx, "foo")
 | 
				
			||||||
@@ -439,7 +441,7 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestEtcdCreateWithConflict(t *testing.T) {
 | 
					func TestEtcdCreateWithConflict(t *testing.T) {
 | 
				
			||||||
	storage, bindingStorage, _, fakeClient := newStorage(t)
 | 
						storage, bindingStorage, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	ctx := api.NewDefaultContext()
 | 
						ctx := api.NewDefaultContext()
 | 
				
			||||||
	fakeClient.TestIndex = true
 | 
						fakeClient.TestIndex = true
 | 
				
			||||||
	key, _ := storage.KeyFunc(ctx, "foo")
 | 
						key, _ := storage.KeyFunc(ctx, "foo")
 | 
				
			||||||
@@ -471,7 +473,7 @@ func TestEtcdCreateWithConflict(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestEtcdCreateWithExistingContainers(t *testing.T) {
 | 
					func TestEtcdCreateWithExistingContainers(t *testing.T) {
 | 
				
			||||||
	storage, bindingStorage, _, fakeClient := newStorage(t)
 | 
						storage, bindingStorage, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	ctx := api.NewDefaultContext()
 | 
						ctx := api.NewDefaultContext()
 | 
				
			||||||
	fakeClient.TestIndex = true
 | 
						fakeClient.TestIndex = true
 | 
				
			||||||
	key, _ := storage.KeyFunc(ctx, "foo")
 | 
						key, _ := storage.KeyFunc(ctx, "foo")
 | 
				
			||||||
@@ -543,7 +545,7 @@ func TestEtcdCreateBinding(t *testing.T) {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	for k, test := range testCases {
 | 
						for k, test := range testCases {
 | 
				
			||||||
		storage, bindingStorage, _, fakeClient := newStorage(t)
 | 
							storage, bindingStorage, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
		key, _ := storage.KeyFunc(ctx, "foo")
 | 
							key, _ := storage.KeyFunc(ctx, "foo")
 | 
				
			||||||
		key = etcdtest.AddPrefix(key)
 | 
							key = etcdtest.AddPrefix(key)
 | 
				
			||||||
		fakeClient.ExpectNotFoundGet(key)
 | 
							fakeClient.ExpectNotFoundGet(key)
 | 
				
			||||||
@@ -566,7 +568,7 @@ func TestEtcdCreateBinding(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestEtcdUpdateNotScheduled(t *testing.T) {
 | 
					func TestEtcdUpdateNotScheduled(t *testing.T) {
 | 
				
			||||||
	storage, _, _, fakeClient := newStorage(t)
 | 
						storage, _, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	ctx := api.NewDefaultContext()
 | 
						ctx := api.NewDefaultContext()
 | 
				
			||||||
	fakeClient.TestIndex = true
 | 
						fakeClient.TestIndex = true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -591,7 +593,7 @@ func TestEtcdUpdateNotScheduled(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestEtcdUpdateScheduled(t *testing.T) {
 | 
					func TestEtcdUpdateScheduled(t *testing.T) {
 | 
				
			||||||
	storage, _, _, fakeClient := newStorage(t)
 | 
						storage, _, _, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	ctx := api.NewDefaultContext()
 | 
						ctx := api.NewDefaultContext()
 | 
				
			||||||
	fakeClient.TestIndex = true
 | 
						fakeClient.TestIndex = true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -657,7 +659,7 @@ func TestEtcdUpdateScheduled(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestEtcdUpdateStatus(t *testing.T) {
 | 
					func TestEtcdUpdateStatus(t *testing.T) {
 | 
				
			||||||
	storage, _, statusStorage, fakeClient := newStorage(t)
 | 
						storage, _, statusStorage, fakeClient, _ := newStorage(t)
 | 
				
			||||||
	ctx := api.NewDefaultContext()
 | 
						ctx := api.NewDefaultContext()
 | 
				
			||||||
	fakeClient.TestIndex = true
 | 
						fakeClient.TestIndex = true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -30,11 +30,17 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/pkg/runtime"
 | 
						"k8s.io/kubernetes/pkg/runtime"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Defined in pkg/registry/node/etcd/etcd.go
 | 
				
			||||||
 | 
					type HostLocator interface {
 | 
				
			||||||
 | 
						HostKubeletPort(pod *api.Pod, ctx api.Context) (int, error)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// LogREST implements the log endpoint for a Pod
 | 
					// LogREST implements the log endpoint for a Pod
 | 
				
			||||||
// TODO: move me into pod/rest - I'm generic to store type via ResourceGetter
 | 
					// TODO: move me into pod/rest - I'm generic to store type via ResourceGetter
 | 
				
			||||||
type LogREST struct {
 | 
					type LogREST struct {
 | 
				
			||||||
	Store       *etcdgeneric.Etcd
 | 
						HostLocator HostLocator
 | 
				
			||||||
	KubeletConn client.ConnectionInfoGetter
 | 
						KubeletConn client.ConnectionInfoGetter
 | 
				
			||||||
 | 
						Store       *etcdgeneric.Etcd
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// LogREST implements GetterWithOptions
 | 
					// LogREST implements GetterWithOptions
 | 
				
			||||||
@@ -55,10 +61,13 @@ func (r *LogREST) Get(ctx api.Context, name string, opts runtime.Object) (runtim
 | 
				
			|||||||
	if errs := validation.ValidatePodLogOptions(logOpts); len(errs) > 0 {
 | 
						if errs := validation.ValidatePodLogOptions(logOpts); len(errs) > 0 {
 | 
				
			||||||
		return nil, errors.NewInvalid("podlogs", name, errs)
 | 
							return nil, errors.NewInvalid("podlogs", name, errs)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	location, transport, err := pod.LogLocation(r.Store, r.KubeletConn, ctx, name, logOpts)
 | 
						location, transport, err := pod.LogLocation(r.Store, r.KubeletConn, ctx, name, logOpts, r.HostLocator)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if location.Host == "" {
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("Empty location.Host in %#v", location)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	return &genericrest.LocationStreamer{
 | 
						return &genericrest.LocationStreamer{
 | 
				
			||||||
		Location:        location,
 | 
							Location:        location,
 | 
				
			||||||
		Transport:       transport,
 | 
							Transport:       transport,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -31,6 +31,7 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/pkg/fields"
 | 
						"k8s.io/kubernetes/pkg/fields"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/labels"
 | 
						"k8s.io/kubernetes/pkg/labels"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/registry/generic"
 | 
						"k8s.io/kubernetes/pkg/registry/generic"
 | 
				
			||||||
 | 
						nodeetcd "k8s.io/kubernetes/pkg/registry/node/etcd"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/runtime"
 | 
						"k8s.io/kubernetes/pkg/runtime"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/util"
 | 
						"k8s.io/kubernetes/pkg/util"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/util/fielderrors"
 | 
						"k8s.io/kubernetes/pkg/util/fielderrors"
 | 
				
			||||||
@@ -228,7 +229,14 @@ func ResourceLocation(getter ResourceGetter, rt http.RoundTripper, ctx api.Conte
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// LogLocation returns the log URL for a pod container. If opts.Container is blank
 | 
					// LogLocation returns the log URL for a pod container. If opts.Container is blank
 | 
				
			||||||
// and only one container is present in the pod, that container is used.
 | 
					// and only one container is present in the pod, that container is used.
 | 
				
			||||||
func LogLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodLogOptions) (*url.URL, http.RoundTripper, error) {
 | 
					func LogLocation(
 | 
				
			||||||
 | 
						getter ResourceGetter,
 | 
				
			||||||
 | 
						connInfo client.ConnectionInfoGetter,
 | 
				
			||||||
 | 
						ctx api.Context,
 | 
				
			||||||
 | 
						name string,
 | 
				
			||||||
 | 
						opts *api.PodLogOptions,
 | 
				
			||||||
 | 
						hostLocator nodeetcd.HostLocator,
 | 
				
			||||||
 | 
					) (*url.URL, http.RoundTripper, error) {
 | 
				
			||||||
	pod, err := getPod(getter, ctx, name)
 | 
						pod, err := getPod(getter, ctx, name)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, nil, err
 | 
							return nil, nil, err
 | 
				
			||||||
@@ -252,6 +260,13 @@ func LogLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ct
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, nil, err
 | 
							return nil, nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						daemonPort, err := hostLocator.HostKubeletPort(pod, ctx)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if daemonPort > 0 {
 | 
				
			||||||
 | 
							nodePort = uint(daemonPort)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	params := url.Values{}
 | 
						params := url.Values{}
 | 
				
			||||||
	if opts.Follow {
 | 
						if opts.Follow {
 | 
				
			||||||
		params.Add("follow", "true")
 | 
							params.Add("follow", "true")
 | 
				
			||||||
@@ -322,17 +337,40 @@ func streamParams(params url.Values, opts runtime.Object) error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// AttachLocation returns the attach URL for a pod container. If opts.Container is blank
 | 
					// AttachLocation returns the attach URL for a pod container. If opts.Container is blank
 | 
				
			||||||
// and only one container is present in the pod, that container is used.
 | 
					// and only one container is present in the pod, that container is used.
 | 
				
			||||||
func AttachLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodAttachOptions) (*url.URL, http.RoundTripper, error) {
 | 
					func AttachLocation(
 | 
				
			||||||
	return streamLocation(getter, connInfo, ctx, name, opts, opts.Container, "attach")
 | 
						getter ResourceGetter,
 | 
				
			||||||
 | 
						connInfo client.ConnectionInfoGetter,
 | 
				
			||||||
 | 
						ctx api.Context,
 | 
				
			||||||
 | 
						name string,
 | 
				
			||||||
 | 
						opts *api.PodAttachOptions,
 | 
				
			||||||
 | 
						hostLocator nodeetcd.HostLocator,
 | 
				
			||||||
 | 
					) (*url.URL, http.RoundTripper, error) {
 | 
				
			||||||
 | 
						return streamLocation(getter, connInfo, ctx, name, opts, opts.Container, "attach", hostLocator)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ExecLocation returns the exec URL for a pod container. If opts.Container is blank
 | 
					// ExecLocation returns the exec URL for a pod container. If opts.Container is blank
 | 
				
			||||||
// and only one container is present in the pod, that container is used.
 | 
					// and only one container is present in the pod, that container is used.
 | 
				
			||||||
func ExecLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodExecOptions) (*url.URL, http.RoundTripper, error) {
 | 
					func ExecLocation(
 | 
				
			||||||
	return streamLocation(getter, connInfo, ctx, name, opts, opts.Container, "exec")
 | 
						getter ResourceGetter,
 | 
				
			||||||
 | 
						connInfo client.ConnectionInfoGetter,
 | 
				
			||||||
 | 
						ctx api.Context,
 | 
				
			||||||
 | 
						name string,
 | 
				
			||||||
 | 
						opts *api.PodExecOptions,
 | 
				
			||||||
 | 
						hostLocator nodeetcd.HostLocator,
 | 
				
			||||||
 | 
					) (*url.URL, http.RoundTripper, error) {
 | 
				
			||||||
 | 
						return streamLocation(getter, connInfo, ctx, name, opts, opts.Container, "exec", hostLocator)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func streamLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts runtime.Object, container, path string) (*url.URL, http.RoundTripper, error) {
 | 
					func streamLocation(
 | 
				
			||||||
 | 
						getter ResourceGetter,
 | 
				
			||||||
 | 
						connInfo client.ConnectionInfoGetter,
 | 
				
			||||||
 | 
						ctx api.Context,
 | 
				
			||||||
 | 
						name string,
 | 
				
			||||||
 | 
						opts runtime.Object,
 | 
				
			||||||
 | 
						container,
 | 
				
			||||||
 | 
						path string,
 | 
				
			||||||
 | 
						hostLocator nodeetcd.HostLocator,
 | 
				
			||||||
 | 
					) (*url.URL, http.RoundTripper, error) {
 | 
				
			||||||
	pod, err := getPod(getter, ctx, name)
 | 
						pod, err := getPod(getter, ctx, name)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, nil, err
 | 
							return nil, nil, err
 | 
				
			||||||
@@ -355,6 +393,13 @@ func streamLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter,
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, nil, err
 | 
							return nil, nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						daemonPort, err := hostLocator.HostKubeletPort(pod, ctx)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if daemonPort > 0 {
 | 
				
			||||||
 | 
							nodePort = uint(daemonPort)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	params := url.Values{}
 | 
						params := url.Values{}
 | 
				
			||||||
	if err := streamParams(params, opts); err != nil {
 | 
						if err := streamParams(params, opts); err != nil {
 | 
				
			||||||
		return nil, nil, err
 | 
							return nil, nil, err
 | 
				
			||||||
@@ -369,7 +414,13 @@ func streamLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter,
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// PortForwardLocation returns the port-forward URL for a pod.
 | 
					// PortForwardLocation returns the port-forward URL for a pod.
 | 
				
			||||||
func PortForwardLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string) (*url.URL, http.RoundTripper, error) {
 | 
					func PortForwardLocation(
 | 
				
			||||||
 | 
						getter ResourceGetter,
 | 
				
			||||||
 | 
						connInfo client.ConnectionInfoGetter,
 | 
				
			||||||
 | 
						ctx api.Context,
 | 
				
			||||||
 | 
						name string,
 | 
				
			||||||
 | 
						hostLocator nodeetcd.HostLocator,
 | 
				
			||||||
 | 
					) (*url.URL, http.RoundTripper, error) {
 | 
				
			||||||
	pod, err := getPod(getter, ctx, name)
 | 
						pod, err := getPod(getter, ctx, name)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, nil, err
 | 
							return nil, nil, err
 | 
				
			||||||
@@ -384,6 +435,13 @@ func PortForwardLocation(getter ResourceGetter, connInfo client.ConnectionInfoGe
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, nil, err
 | 
							return nil, nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						daemonPort, err := hostLocator.HostKubeletPort(pod, ctx)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if daemonPort > 0 {
 | 
				
			||||||
 | 
							nodePort = uint(daemonPort)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	loc := &url.URL{
 | 
						loc := &url.URL{
 | 
				
			||||||
		Scheme: nodeScheme,
 | 
							Scheme: nodeScheme,
 | 
				
			||||||
		Host:   fmt.Sprintf("%s:%d", nodeHost, nodePort),
 | 
							Host:   fmt.Sprintf("%s:%d", nodeHost, nodePort),
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user