mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #842 from smarterclayton/add_api_groups
Extract RESTHandler and allow API groupings
This commit is contained in:
		@@ -25,6 +25,7 @@ import (
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
 | 
			
		||||
@@ -114,5 +115,13 @@ func main() {
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	glog.Fatal(m.Run(net.JoinHostPort(*address, strconv.Itoa(int(*port))), *apiPrefix))
 | 
			
		||||
	storage, codec := m.API_v1beta1()
 | 
			
		||||
	s := &http.Server{
 | 
			
		||||
		Addr:           net.JoinHostPort(*address, strconv.Itoa(int(*port))),
 | 
			
		||||
		Handler:        apiserver.Handle(storage, codec, *apiPrefix),
 | 
			
		||||
		ReadTimeout:    10 * time.Second,
 | 
			
		||||
		WriteTimeout:   10 * time.Second,
 | 
			
		||||
		MaxHeaderBytes: 1 << 20,
 | 
			
		||||
	}
 | 
			
		||||
	glog.Fatal(s.ListenAndServe())
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -29,6 +29,7 @@ import (
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
 | 
			
		||||
@@ -86,11 +87,11 @@ func startComponents(manifestURL string) (apiServerURL string) {
 | 
			
		||||
	machineList := []string{"localhost", "machine"}
 | 
			
		||||
 | 
			
		||||
	handler := delegateHandler{}
 | 
			
		||||
	apiserver := httptest.NewServer(&handler)
 | 
			
		||||
	apiServer := httptest.NewServer(&handler)
 | 
			
		||||
 | 
			
		||||
	etcdClient := etcd.NewClient(servers)
 | 
			
		||||
 | 
			
		||||
	cl := client.New(apiserver.URL, nil)
 | 
			
		||||
	cl := client.New(apiServer.URL, nil)
 | 
			
		||||
	cl.PollPeriod = time.Second * 1
 | 
			
		||||
	cl.Sync = true
 | 
			
		||||
 | 
			
		||||
@@ -101,7 +102,8 @@ func startComponents(manifestURL string) (apiServerURL string) {
 | 
			
		||||
		Minions:       machineList,
 | 
			
		||||
		PodInfoGetter: fakePodInfoGetter{},
 | 
			
		||||
	})
 | 
			
		||||
	handler.delegate = m.ConstructHandler("/api/v1beta1")
 | 
			
		||||
	storage, codec := m.API_v1beta1()
 | 
			
		||||
	handler.delegate = apiserver.Handle(storage, codec, "/api/v1beta1")
 | 
			
		||||
 | 
			
		||||
	controllerManager := controller.MakeReplicationManager(cl)
 | 
			
		||||
 | 
			
		||||
@@ -130,7 +132,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
 | 
			
		||||
		kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), http.DefaultServeMux, "localhost", 10251)
 | 
			
		||||
	}, 0)
 | 
			
		||||
 | 
			
		||||
	return apiserver.URL
 | 
			
		||||
	return apiServer.URL
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func runReplicationControllerTest(kubeClient *client.Client) {
 | 
			
		||||
 
 | 
			
		||||
@@ -21,15 +21,12 @@ import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"path"
 | 
			
		||||
	"runtime/debug"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
)
 | 
			
		||||
@@ -42,215 +39,104 @@ type Codec interface {
 | 
			
		||||
	DecodeInto(data []byte, obj interface{}) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// APIServer is an HTTPHandler that delegates to RESTStorage objects.
 | 
			
		||||
// It handles URLs of the form:
 | 
			
		||||
// ${prefix}/${storage_key}[/${object_name}]
 | 
			
		||||
// Where 'prefix' is an arbitrary string, and 'storage_key' points to a RESTStorage object stored in storage.
 | 
			
		||||
//
 | 
			
		||||
// TODO: consider migrating this to go-restful which is a more full-featured version of the same thing.
 | 
			
		||||
type APIServer struct {
 | 
			
		||||
	storage     map[string]RESTStorage
 | 
			
		||||
	codec       Codec
 | 
			
		||||
	ops         *Operations
 | 
			
		||||
	asyncOpWait time.Duration
 | 
			
		||||
	handler     http.Handler
 | 
			
		||||
// mux is an object that can register http handlers
 | 
			
		||||
type mux interface {
 | 
			
		||||
	Handle(pattern string, handler http.Handler)
 | 
			
		||||
	HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// New creates a new APIServer object. 'storage' contains a map of handlers. 'codec'
 | 
			
		||||
// is an interface for decoding to and from JSON. 'prefix' is the hosting path prefix.
 | 
			
		||||
// defaultAPIServer exposes nested objects for testability
 | 
			
		||||
type defaultAPIServer struct {
 | 
			
		||||
	http.Handler
 | 
			
		||||
	group *APIGroup
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Handle returns a Handler function that expose the provided storage interfaces
 | 
			
		||||
// as RESTful resources at prefix, serialized by codec, and also includes the support
 | 
			
		||||
// http resources.
 | 
			
		||||
func Handle(storage map[string]RESTStorage, codec Codec, prefix string) http.Handler {
 | 
			
		||||
	group := NewAPIGroup(storage, codec)
 | 
			
		||||
 | 
			
		||||
	mux := http.NewServeMux()
 | 
			
		||||
	group.InstallREST(mux, prefix)
 | 
			
		||||
	InstallSupport(mux)
 | 
			
		||||
 | 
			
		||||
	return &defaultAPIServer{RecoverPanics(mux), group}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// APIGroup is a http.Handler that exposes multiple RESTStorage objects
 | 
			
		||||
// It handles URLs of the form:
 | 
			
		||||
// /${storage_key}[/${object_name}]
 | 
			
		||||
// Where 'storage_key' points to a RESTStorage object stored in storage.
 | 
			
		||||
//
 | 
			
		||||
// The codec will be used to decode the request body into an object pointer returned by
 | 
			
		||||
// RESTStorage.New().  The Create() and Update() methods should cast their argument to
 | 
			
		||||
// the type returned by New().
 | 
			
		||||
// TODO: consider migrating this to go-restful which is a more full-featured version of the same thing.
 | 
			
		||||
type APIGroup struct {
 | 
			
		||||
	handler RESTHandler
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewAPIGroup returns an object that will serve a set of REST resources and their
 | 
			
		||||
// associated operations.  The provided codec controls serialization and deserialization.
 | 
			
		||||
// This is a helper method for registering multiple sets of REST handlers under different
 | 
			
		||||
// prefixes onto a server.
 | 
			
		||||
// TODO: add multitype codec serialization
 | 
			
		||||
func New(storage map[string]RESTStorage, codec Codec, prefix string) *APIServer {
 | 
			
		||||
	s := &APIServer{
 | 
			
		||||
func NewAPIGroup(storage map[string]RESTStorage, codec Codec) *APIGroup {
 | 
			
		||||
	return &APIGroup{RESTHandler{
 | 
			
		||||
		storage: storage,
 | 
			
		||||
		codec:   codec,
 | 
			
		||||
		ops:     NewOperations(),
 | 
			
		||||
		// Delay just long enough to handle most simple write operations
 | 
			
		||||
		asyncOpWait: time.Millisecond * 25,
 | 
			
		||||
	}}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// InstallREST registers the REST handlers (storage, watch, and operations) into a mux.
 | 
			
		||||
// It is expected that the provided prefix will serve all operations. Path MUST NOT end
 | 
			
		||||
// in a slash.
 | 
			
		||||
func (g *APIGroup) InstallREST(mux mux, paths ...string) {
 | 
			
		||||
	restHandler := &g.handler
 | 
			
		||||
	watchHandler := &WatchHandler{g.handler.storage, g.handler.codec}
 | 
			
		||||
	opHandler := &OperationHandler{g.handler.ops, g.handler.codec}
 | 
			
		||||
 | 
			
		||||
	for _, prefix := range paths {
 | 
			
		||||
		prefix = strings.TrimRight(prefix, "/")
 | 
			
		||||
		mux.Handle(prefix+"/", http.StripPrefix(prefix, restHandler))
 | 
			
		||||
		mux.Handle(prefix+"/watch/", http.StripPrefix(prefix+"/watch/", watchHandler))
 | 
			
		||||
		mux.Handle(prefix+"/operations", http.StripPrefix(prefix+"/operations", opHandler))
 | 
			
		||||
		mux.Handle(prefix+"/operations/", http.StripPrefix(prefix+"/operations/", opHandler))
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
	mux := http.NewServeMux()
 | 
			
		||||
 | 
			
		||||
	prefix = strings.TrimRight(prefix, "/")
 | 
			
		||||
 | 
			
		||||
	// Primary API handlers
 | 
			
		||||
	restPrefix := prefix + "/"
 | 
			
		||||
	mux.Handle(restPrefix, http.StripPrefix(restPrefix, http.HandlerFunc(s.handleREST)))
 | 
			
		||||
 | 
			
		||||
	// Watch API handlers
 | 
			
		||||
	watchPrefix := path.Join(prefix, "watch") + "/"
 | 
			
		||||
	mux.Handle(watchPrefix, http.StripPrefix(watchPrefix, &WatchHandler{storage, codec}))
 | 
			
		||||
 | 
			
		||||
	// Support services for the apiserver
 | 
			
		||||
	logsPrefix := "/logs/"
 | 
			
		||||
	mux.Handle(logsPrefix, http.StripPrefix(logsPrefix, http.FileServer(http.Dir("/var/log/"))))
 | 
			
		||||
// InstallSupport registers the APIServer support functions into a mux.
 | 
			
		||||
func InstallSupport(mux mux) {
 | 
			
		||||
	healthz.InstallHandler(mux)
 | 
			
		||||
	mux.Handle("/logs/", http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))))
 | 
			
		||||
	mux.Handle("/proxy/minion/", http.StripPrefix("/proxy/minion", http.HandlerFunc(handleProxyMinion)))
 | 
			
		||||
	mux.HandleFunc("/version", handleVersion)
 | 
			
		||||
	mux.HandleFunc("/", handleIndex)
 | 
			
		||||
 | 
			
		||||
	// Handle both operations and operations/* with the same handler
 | 
			
		||||
	handler := &OperationHandler{s.ops, s.codec}
 | 
			
		||||
	operationPrefix := path.Join(prefix, "operations")
 | 
			
		||||
	mux.Handle(operationPrefix, http.StripPrefix(operationPrefix, handler))
 | 
			
		||||
	operationsPrefix := operationPrefix + "/"
 | 
			
		||||
	mux.Handle(operationsPrefix, http.StripPrefix(operationsPrefix, handler))
 | 
			
		||||
 | 
			
		||||
	// Proxy minion requests
 | 
			
		||||
	mux.Handle("/proxy/minion/", http.StripPrefix("/proxy/minion", http.HandlerFunc(handleProxyMinion)))
 | 
			
		||||
 | 
			
		||||
	s.handler = mux
 | 
			
		||||
 | 
			
		||||
	return s
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ServeHTTP implements the standard net/http interface.
 | 
			
		||||
func (s *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if x := recover(); x != nil {
 | 
			
		||||
			w.WriteHeader(http.StatusInternalServerError)
 | 
			
		||||
			fmt.Fprint(w, "apis panic. Look in log for details.")
 | 
			
		||||
			glog.Infof("APIServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack())
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	defer httplog.MakeLogged(req, &w).StacktraceWhen(
 | 
			
		||||
		httplog.StatusIsNot(
 | 
			
		||||
			http.StatusOK,
 | 
			
		||||
			http.StatusAccepted,
 | 
			
		||||
			http.StatusConflict,
 | 
			
		||||
			http.StatusNotFound,
 | 
			
		||||
		),
 | 
			
		||||
	).Log()
 | 
			
		||||
 | 
			
		||||
	// Dispatch to the internal handler
 | 
			
		||||
	s.handler.ServeHTTP(w, req)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// handleREST handles requests to all our RESTStorage objects.
 | 
			
		||||
func (s *APIServer) handleREST(w http.ResponseWriter, req *http.Request) {
 | 
			
		||||
	parts := splitPath(req.URL.Path)
 | 
			
		||||
	if len(parts) < 1 {
 | 
			
		||||
		notFound(w, req)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	storage := s.storage[parts[0]]
 | 
			
		||||
	if storage == nil {
 | 
			
		||||
		httplog.LogOf(w).Addf("'%v' has no storage object", parts[0])
 | 
			
		||||
		notFound(w, req)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	s.handleRESTStorage(parts, req, w, storage)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// handleRESTStorage is the main dispatcher for a storage object.  It switches on the HTTP method, and then
 | 
			
		||||
// on path length, according to the following table:
 | 
			
		||||
//   Method     Path          Action
 | 
			
		||||
//   GET        /foo          list
 | 
			
		||||
//   GET        /foo/bar      get 'bar'
 | 
			
		||||
//   POST       /foo          create
 | 
			
		||||
//   PUT        /foo/bar      update 'bar'
 | 
			
		||||
//   DELETE     /foo/bar      delete 'bar'
 | 
			
		||||
// Returns 404 if the method/pattern doesn't match one of these entries
 | 
			
		||||
// The s accepts several query parameters:
 | 
			
		||||
//    sync=[false|true] Synchronous request (only applies to create, update, delete operations)
 | 
			
		||||
//    timeout=<duration> Timeout for synchronous requests, only applies if sync=true
 | 
			
		||||
//    labels=<label-selector> Used for filtering list operations
 | 
			
		||||
func (s *APIServer) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) {
 | 
			
		||||
	sync := req.URL.Query().Get("sync") == "true"
 | 
			
		||||
	timeout := parseTimeout(req.URL.Query().Get("timeout"))
 | 
			
		||||
	switch req.Method {
 | 
			
		||||
	case "GET":
 | 
			
		||||
		switch len(parts) {
 | 
			
		||||
		case 1:
 | 
			
		||||
			selector, err := labels.ParseSelector(req.URL.Query().Get("labels"))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				errorJSON(err, s.codec, w)
 | 
			
		||||
				return
 | 
			
		||||
// RecoverPanics wraps an http Handler to recover and log panics
 | 
			
		||||
func RecoverPanics(handler http.Handler) http.Handler {
 | 
			
		||||
	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
 | 
			
		||||
		defer func() {
 | 
			
		||||
			if x := recover(); x != nil {
 | 
			
		||||
				w.WriteHeader(http.StatusInternalServerError)
 | 
			
		||||
				fmt.Fprint(w, "apis panic. Look in log for details.")
 | 
			
		||||
				glog.Infof("APIServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack())
 | 
			
		||||
			}
 | 
			
		||||
			list, err := storage.List(selector)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				errorJSON(err, s.codec, w)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			writeJSON(http.StatusOK, s.codec, list, w)
 | 
			
		||||
		case 2:
 | 
			
		||||
			item, err := storage.Get(parts[1])
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				errorJSON(err, s.codec, w)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			writeJSON(http.StatusOK, s.codec, item, w)
 | 
			
		||||
		default:
 | 
			
		||||
			notFound(w, req)
 | 
			
		||||
		}
 | 
			
		||||
		}()
 | 
			
		||||
		defer httplog.MakeLogged(req, &w).StacktraceWhen(
 | 
			
		||||
			httplog.StatusIsNot(
 | 
			
		||||
				http.StatusOK,
 | 
			
		||||
				http.StatusAccepted,
 | 
			
		||||
				http.StatusConflict,
 | 
			
		||||
				http.StatusNotFound,
 | 
			
		||||
			),
 | 
			
		||||
		).Log()
 | 
			
		||||
 | 
			
		||||
	case "POST":
 | 
			
		||||
		if len(parts) != 1 {
 | 
			
		||||
			notFound(w, req)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		body, err := readBody(req)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errorJSON(err, s.codec, w)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		obj := storage.New()
 | 
			
		||||
		err = s.codec.DecodeInto(body, obj)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errorJSON(err, s.codec, w)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		out, err := storage.Create(obj)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errorJSON(err, s.codec, w)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		op := s.createOperation(out, sync, timeout)
 | 
			
		||||
		s.finishReq(op, w)
 | 
			
		||||
 | 
			
		||||
	case "DELETE":
 | 
			
		||||
		if len(parts) != 2 {
 | 
			
		||||
			notFound(w, req)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		out, err := storage.Delete(parts[1])
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errorJSON(err, s.codec, w)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		op := s.createOperation(out, sync, timeout)
 | 
			
		||||
		s.finishReq(op, w)
 | 
			
		||||
 | 
			
		||||
	case "PUT":
 | 
			
		||||
		if len(parts) != 2 {
 | 
			
		||||
			notFound(w, req)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		body, err := readBody(req)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errorJSON(err, s.codec, w)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		obj := storage.New()
 | 
			
		||||
		err = s.codec.DecodeInto(body, obj)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errorJSON(err, s.codec, w)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		out, err := storage.Update(obj)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errorJSON(err, s.codec, w)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		op := s.createOperation(out, sync, timeout)
 | 
			
		||||
		s.finishReq(op, w)
 | 
			
		||||
 | 
			
		||||
	default:
 | 
			
		||||
		notFound(w, req)
 | 
			
		||||
	}
 | 
			
		||||
		// Dispatch to the internal handler
 | 
			
		||||
		handler.ServeHTTP(w, req)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// handleVersionReq writes the server's version information.
 | 
			
		||||
@@ -258,40 +144,6 @@ func handleVersion(w http.ResponseWriter, req *http.Request) {
 | 
			
		||||
	writeRawJSON(http.StatusOK, version.Get(), w)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// createOperation creates an operation to process a channel response
 | 
			
		||||
func (s *APIServer) createOperation(out <-chan interface{}, sync bool, timeout time.Duration) *Operation {
 | 
			
		||||
	op := s.ops.NewOperation(out)
 | 
			
		||||
	if sync {
 | 
			
		||||
		op.WaitFor(timeout)
 | 
			
		||||
	} else if s.asyncOpWait != 0 {
 | 
			
		||||
		op.WaitFor(s.asyncOpWait)
 | 
			
		||||
	}
 | 
			
		||||
	return op
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an
 | 
			
		||||
// Operation to receive the result and returning its ID down the writer.
 | 
			
		||||
func (s *APIServer) finishReq(op *Operation, w http.ResponseWriter) {
 | 
			
		||||
	obj, complete := op.StatusOrResult()
 | 
			
		||||
	if complete {
 | 
			
		||||
		status := http.StatusOK
 | 
			
		||||
		switch stat := obj.(type) {
 | 
			
		||||
		case api.Status:
 | 
			
		||||
			httplog.LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.")
 | 
			
		||||
			if stat.Code != 0 {
 | 
			
		||||
				status = stat.Code
 | 
			
		||||
			}
 | 
			
		||||
		case *api.Status:
 | 
			
		||||
			if stat.Code != 0 {
 | 
			
		||||
				status = stat.Code
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		writeJSON(status, s.codec, obj, w)
 | 
			
		||||
	} else {
 | 
			
		||||
		writeJSON(http.StatusAccepted, s.codec, obj, w)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// writeJSON renders an object as JSON to the response
 | 
			
		||||
func writeJSON(statusCode int, codec Codec, object interface{}, w http.ResponseWriter) {
 | 
			
		||||
	output, err := codec.Encode(object)
 | 
			
		||||
 
 | 
			
		||||
@@ -170,7 +170,7 @@ func TestNotFound(t *testing.T) {
 | 
			
		||||
		"watch missing storage":        {"GET", "/prefix/version/watch/"},
 | 
			
		||||
		"watch with bad method":        {"POST", "/prefix/version/watch/foo/bar"},
 | 
			
		||||
	}
 | 
			
		||||
	handler := New(map[string]RESTStorage{
 | 
			
		||||
	handler := Handle(map[string]RESTStorage{
 | 
			
		||||
		"foo": &SimpleRESTStorage{},
 | 
			
		||||
	}, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
@@ -193,7 +193,7 @@ func TestNotFound(t *testing.T) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestVersion(t *testing.T) {
 | 
			
		||||
	handler := New(map[string]RESTStorage{}, codec, "/prefix/version")
 | 
			
		||||
	handler := Handle(map[string]RESTStorage{}, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
	client := http.Client{}
 | 
			
		||||
 | 
			
		||||
@@ -222,7 +222,7 @@ func TestSimpleList(t *testing.T) {
 | 
			
		||||
	storage := map[string]RESTStorage{}
 | 
			
		||||
	simpleStorage := SimpleRESTStorage{}
 | 
			
		||||
	storage["simple"] = &simpleStorage
 | 
			
		||||
	handler := New(storage, codec, "/prefix/version")
 | 
			
		||||
	handler := Handle(storage, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
 | 
			
		||||
	resp, err := http.Get(server.URL + "/prefix/version/simple")
 | 
			
		||||
@@ -241,7 +241,7 @@ func TestErrorList(t *testing.T) {
 | 
			
		||||
		errors: map[string]error{"list": fmt.Errorf("test Error")},
 | 
			
		||||
	}
 | 
			
		||||
	storage["simple"] = &simpleStorage
 | 
			
		||||
	handler := New(storage, codec, "/prefix/version")
 | 
			
		||||
	handler := Handle(storage, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
 | 
			
		||||
	resp, err := http.Get(server.URL + "/prefix/version/simple")
 | 
			
		||||
@@ -265,7 +265,7 @@ func TestNonEmptyList(t *testing.T) {
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	storage["simple"] = &simpleStorage
 | 
			
		||||
	handler := New(storage, codec, "/prefix/version")
 | 
			
		||||
	handler := Handle(storage, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
 | 
			
		||||
	resp, err := http.Get(server.URL + "/prefix/version/simple")
 | 
			
		||||
@@ -300,7 +300,7 @@ func TestGet(t *testing.T) {
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	storage["simple"] = &simpleStorage
 | 
			
		||||
	handler := New(storage, codec, "/prefix/version")
 | 
			
		||||
	handler := Handle(storage, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
 | 
			
		||||
	resp, err := http.Get(server.URL + "/prefix/version/simple/id")
 | 
			
		||||
@@ -321,7 +321,7 @@ func TestGetMissing(t *testing.T) {
 | 
			
		||||
		errors: map[string]error{"get": NewNotFoundErr("simple", "id")},
 | 
			
		||||
	}
 | 
			
		||||
	storage["simple"] = &simpleStorage
 | 
			
		||||
	handler := New(storage, codec, "/prefix/version")
 | 
			
		||||
	handler := Handle(storage, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
 | 
			
		||||
	resp, err := http.Get(server.URL + "/prefix/version/simple/id")
 | 
			
		||||
@@ -339,7 +339,7 @@ func TestDelete(t *testing.T) {
 | 
			
		||||
	simpleStorage := SimpleRESTStorage{}
 | 
			
		||||
	ID := "id"
 | 
			
		||||
	storage["simple"] = &simpleStorage
 | 
			
		||||
	handler := New(storage, codec, "/prefix/version")
 | 
			
		||||
	handler := Handle(storage, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
 | 
			
		||||
	client := http.Client{}
 | 
			
		||||
@@ -361,7 +361,7 @@ func TestDeleteMissing(t *testing.T) {
 | 
			
		||||
		errors: map[string]error{"delete": NewNotFoundErr("simple", ID)},
 | 
			
		||||
	}
 | 
			
		||||
	storage["simple"] = &simpleStorage
 | 
			
		||||
	handler := New(storage, codec, "/prefix/version")
 | 
			
		||||
	handler := Handle(storage, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
 | 
			
		||||
	client := http.Client{}
 | 
			
		||||
@@ -381,7 +381,7 @@ func TestUpdate(t *testing.T) {
 | 
			
		||||
	simpleStorage := SimpleRESTStorage{}
 | 
			
		||||
	ID := "id"
 | 
			
		||||
	storage["simple"] = &simpleStorage
 | 
			
		||||
	handler := New(storage, codec, "/prefix/version")
 | 
			
		||||
	handler := Handle(storage, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
 | 
			
		||||
	item := Simple{
 | 
			
		||||
@@ -411,7 +411,7 @@ func TestUpdateMissing(t *testing.T) {
 | 
			
		||||
		errors: map[string]error{"update": NewNotFoundErr("simple", ID)},
 | 
			
		||||
	}
 | 
			
		||||
	storage["simple"] = &simpleStorage
 | 
			
		||||
	handler := New(storage, codec, "/prefix/version")
 | 
			
		||||
	handler := Handle(storage, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
 | 
			
		||||
	item := Simple{
 | 
			
		||||
@@ -436,10 +436,10 @@ func TestUpdateMissing(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
func TestCreate(t *testing.T) {
 | 
			
		||||
	simpleStorage := &SimpleRESTStorage{}
 | 
			
		||||
	handler := New(map[string]RESTStorage{
 | 
			
		||||
	handler := Handle(map[string]RESTStorage{
 | 
			
		||||
		"foo": simpleStorage,
 | 
			
		||||
	}, codec, "/prefix/version")
 | 
			
		||||
	handler.asyncOpWait = 0
 | 
			
		||||
	handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
	client := http.Client{}
 | 
			
		||||
 | 
			
		||||
@@ -473,7 +473,7 @@ func TestCreate(t *testing.T) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestCreateNotFound(t *testing.T) {
 | 
			
		||||
	handler := New(map[string]RESTStorage{
 | 
			
		||||
	handler := Handle(map[string]RESTStorage{
 | 
			
		||||
		"simple": &SimpleRESTStorage{
 | 
			
		||||
			// storage.Create can fail with not found error in theory.
 | 
			
		||||
			// See https://github.com/GoogleCloudPlatform/kubernetes/pull/486#discussion_r15037092.
 | 
			
		||||
@@ -519,7 +519,7 @@ func TestSyncCreate(t *testing.T) {
 | 
			
		||||
			return obj, nil
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	handler := New(map[string]RESTStorage{
 | 
			
		||||
	handler := Handle(map[string]RESTStorage{
 | 
			
		||||
		"foo": &storage,
 | 
			
		||||
	}, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
@@ -621,8 +621,8 @@ func TestAsyncDelayReturnsError(t *testing.T) {
 | 
			
		||||
			return nil, NewAlreadyExistsErr("foo", "bar")
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	handler := New(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version")
 | 
			
		||||
	handler.asyncOpWait = time.Millisecond / 2
 | 
			
		||||
	handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version")
 | 
			
		||||
	handler.(*defaultAPIServer).group.handler.asyncOpWait = time.Millisecond / 2
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
 | 
			
		||||
	status := expectApiStatus(t, "DELETE", fmt.Sprintf("%s/prefix/version/foo/bar", server.URL), nil, http.StatusConflict)
 | 
			
		||||
@@ -639,8 +639,8 @@ func TestAsyncCreateError(t *testing.T) {
 | 
			
		||||
			return nil, NewAlreadyExistsErr("foo", "bar")
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	handler := New(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version")
 | 
			
		||||
	handler.asyncOpWait = 0
 | 
			
		||||
	handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version")
 | 
			
		||||
	handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
 | 
			
		||||
	simple := Simple{Name: "foo"}
 | 
			
		||||
@@ -728,7 +728,7 @@ func TestSyncCreateTimeout(t *testing.T) {
 | 
			
		||||
			return obj, nil
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	handler := New(map[string]RESTStorage{
 | 
			
		||||
	handler := Handle(map[string]RESTStorage{
 | 
			
		||||
		"foo": &storage,
 | 
			
		||||
	}, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
 
 | 
			
		||||
@@ -127,7 +127,7 @@ func TestApiServerMinionProxy(t *testing.T) {
 | 
			
		||||
	proxyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
 | 
			
		||||
		w.Write([]byte(req.URL.Path))
 | 
			
		||||
	}))
 | 
			
		||||
	server := httptest.NewServer(New(nil, nil, "/prefix"))
 | 
			
		||||
	server := httptest.NewServer(Handle(nil, nil, "/prefix"))
 | 
			
		||||
	proxy, _ := url.Parse(proxyServer.URL)
 | 
			
		||||
	resp, err := http.Get(fmt.Sprintf("%s/proxy/minion/%s%s", server.URL, proxy.Host, "/test"))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -93,10 +93,10 @@ func TestOperation(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
func TestOperationsList(t *testing.T) {
 | 
			
		||||
	simpleStorage := &SimpleRESTStorage{}
 | 
			
		||||
	handler := New(map[string]RESTStorage{
 | 
			
		||||
	handler := Handle(map[string]RESTStorage{
 | 
			
		||||
		"foo": simpleStorage,
 | 
			
		||||
	}, codec, "/prefix/version")
 | 
			
		||||
	handler.asyncOpWait = 0
 | 
			
		||||
	handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
	client := http.Client{}
 | 
			
		||||
 | 
			
		||||
@@ -105,26 +105,26 @@ func TestOperationsList(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
	data, err := codec.Encode(simple)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unexpected error: %v", err)
 | 
			
		||||
		t.Fatalf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	response, err := client.Post(server.URL+"/prefix/version/foo", "application/json", bytes.NewBuffer(data))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unexpected error: %v", err)
 | 
			
		||||
		t.Fatalf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if response.StatusCode != http.StatusAccepted {
 | 
			
		||||
		t.Errorf("Unexpected response %#v", response)
 | 
			
		||||
		t.Fatalf("Unexpected response %#v", response)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	response, err = client.Get(server.URL + "/prefix/version/operations")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unexpected error: %v", err)
 | 
			
		||||
		t.Fatalf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if response.StatusCode != http.StatusOK {
 | 
			
		||||
		t.Fatalf("unexpected status code %#v", response)
 | 
			
		||||
	}
 | 
			
		||||
	body, err := ioutil.ReadAll(response.Body)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unexpected error: %v", err)
 | 
			
		||||
		t.Fatalf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	obj, err := codec.Decode(body)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -149,10 +149,10 @@ func TestOpGet(t *testing.T) {
 | 
			
		||||
			return obj, nil
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	handler := New(map[string]RESTStorage{
 | 
			
		||||
	handler := Handle(map[string]RESTStorage{
 | 
			
		||||
		"foo": simpleStorage,
 | 
			
		||||
	}, codec, "/prefix/version")
 | 
			
		||||
	handler.asyncOpWait = 0
 | 
			
		||||
	handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
	client := http.Client{}
 | 
			
		||||
 | 
			
		||||
@@ -162,27 +162,27 @@ func TestOpGet(t *testing.T) {
 | 
			
		||||
	data, err := codec.Encode(simple)
 | 
			
		||||
	t.Log(string(data))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unexpected error: %v", err)
 | 
			
		||||
		t.Fatalf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unexpected error: %v", err)
 | 
			
		||||
		t.Fatalf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	response, err := client.Do(request)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unexpected error: %v", err)
 | 
			
		||||
		t.Fatalf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if response.StatusCode != http.StatusAccepted {
 | 
			
		||||
		t.Errorf("Unexpected response %#v", response)
 | 
			
		||||
		t.Fatalf("Unexpected response %#v", response)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var itemOut api.Status
 | 
			
		||||
	body, err := extractBody(response, &itemOut)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unexpected error: %v", err)
 | 
			
		||||
		t.Fatalf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if itemOut.Status != api.StatusWorking || itemOut.Details == nil || itemOut.Details.ID == "" {
 | 
			
		||||
@@ -191,12 +191,12 @@ func TestOpGet(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	req2, err := http.NewRequest("GET", server.URL+"/prefix/version/operations/"+itemOut.Details.ID, nil)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unexpected error: %v", err)
 | 
			
		||||
		t.Fatalf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err = client.Do(req2)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unexpected error: %v", err)
 | 
			
		||||
		t.Fatalf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if response.StatusCode != http.StatusAccepted {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										192
									
								
								pkg/apiserver/resthandler.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										192
									
								
								pkg/apiserver/resthandler.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,192 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2014 Google Inc. All rights reserved.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package apiserver
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type RESTHandler struct {
 | 
			
		||||
	storage     map[string]RESTStorage
 | 
			
		||||
	codec       Codec
 | 
			
		||||
	ops         *Operations
 | 
			
		||||
	asyncOpWait time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ServeHTTP handles requests to all RESTStorage objects.
 | 
			
		||||
func (h *RESTHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 | 
			
		||||
	parts := splitPath(req.URL.Path)
 | 
			
		||||
	if len(parts) < 1 {
 | 
			
		||||
		notFound(w, req)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	storage := h.storage[parts[0]]
 | 
			
		||||
	if storage == nil {
 | 
			
		||||
		httplog.LogOf(w).Addf("'%v' has no storage object", parts[0])
 | 
			
		||||
		notFound(w, req)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	h.handleRESTStorage(parts, req, w, storage)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// handleRESTStorage is the main dispatcher for a storage object.  It switches on the HTTP method, and then
 | 
			
		||||
// on path length, according to the following table:
 | 
			
		||||
//   Method     Path          Action
 | 
			
		||||
//   GET        /foo          list
 | 
			
		||||
//   GET        /foo/bar      get 'bar'
 | 
			
		||||
//   POST       /foo          create
 | 
			
		||||
//   PUT        /foo/bar      update 'bar'
 | 
			
		||||
//   DELETE     /foo/bar      delete 'bar'
 | 
			
		||||
// Returns 404 if the method/pattern doesn't match one of these entries
 | 
			
		||||
// The s accepts several query parameters:
 | 
			
		||||
//    sync=[false|true] Synchronous request (only applies to create, update, delete operations)
 | 
			
		||||
//    timeout=<duration> Timeout for synchronous requests, only applies if sync=true
 | 
			
		||||
//    labels=<label-selector> Used for filtering list operations
 | 
			
		||||
func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) {
 | 
			
		||||
	sync := req.URL.Query().Get("sync") == "true"
 | 
			
		||||
	timeout := parseTimeout(req.URL.Query().Get("timeout"))
 | 
			
		||||
	switch req.Method {
 | 
			
		||||
	case "GET":
 | 
			
		||||
		switch len(parts) {
 | 
			
		||||
		case 1:
 | 
			
		||||
			selector, err := labels.ParseSelector(req.URL.Query().Get("labels"))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				errorJSON(err, h.codec, w)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			list, err := storage.List(selector)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				errorJSON(err, h.codec, w)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			writeJSON(http.StatusOK, h.codec, list, w)
 | 
			
		||||
		case 2:
 | 
			
		||||
			item, err := storage.Get(parts[1])
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				errorJSON(err, h.codec, w)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			writeJSON(http.StatusOK, h.codec, item, w)
 | 
			
		||||
		default:
 | 
			
		||||
			notFound(w, req)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	case "POST":
 | 
			
		||||
		if len(parts) != 1 {
 | 
			
		||||
			notFound(w, req)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		body, err := readBody(req)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errorJSON(err, h.codec, w)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		obj := storage.New()
 | 
			
		||||
		err = h.codec.DecodeInto(body, obj)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errorJSON(err, h.codec, w)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		out, err := storage.Create(obj)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errorJSON(err, h.codec, w)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		op := h.createOperation(out, sync, timeout)
 | 
			
		||||
		h.finishReq(op, w)
 | 
			
		||||
 | 
			
		||||
	case "DELETE":
 | 
			
		||||
		if len(parts) != 2 {
 | 
			
		||||
			notFound(w, req)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		out, err := storage.Delete(parts[1])
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errorJSON(err, h.codec, w)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		op := h.createOperation(out, sync, timeout)
 | 
			
		||||
		h.finishReq(op, w)
 | 
			
		||||
 | 
			
		||||
	case "PUT":
 | 
			
		||||
		if len(parts) != 2 {
 | 
			
		||||
			notFound(w, req)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		body, err := readBody(req)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errorJSON(err, h.codec, w)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		obj := storage.New()
 | 
			
		||||
		err = h.codec.DecodeInto(body, obj)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errorJSON(err, h.codec, w)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		out, err := storage.Update(obj)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errorJSON(err, h.codec, w)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		op := h.createOperation(out, sync, timeout)
 | 
			
		||||
		h.finishReq(op, w)
 | 
			
		||||
 | 
			
		||||
	default:
 | 
			
		||||
		notFound(w, req)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// createOperation creates an operation to process a channel response
 | 
			
		||||
func (h *RESTHandler) createOperation(out <-chan interface{}, sync bool, timeout time.Duration) *Operation {
 | 
			
		||||
	op := h.ops.NewOperation(out)
 | 
			
		||||
	if sync {
 | 
			
		||||
		op.WaitFor(timeout)
 | 
			
		||||
	} else if h.asyncOpWait != 0 {
 | 
			
		||||
		op.WaitFor(h.asyncOpWait)
 | 
			
		||||
	}
 | 
			
		||||
	return op
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an
 | 
			
		||||
// Operation to receive the result and returning its ID down the writer.
 | 
			
		||||
func (h *RESTHandler) finishReq(op *Operation, w http.ResponseWriter) {
 | 
			
		||||
	obj, complete := op.StatusOrResult()
 | 
			
		||||
	if complete {
 | 
			
		||||
		status := http.StatusOK
 | 
			
		||||
		switch stat := obj.(type) {
 | 
			
		||||
		case api.Status:
 | 
			
		||||
			httplog.LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.")
 | 
			
		||||
			if stat.Code != 0 {
 | 
			
		||||
				status = stat.Code
 | 
			
		||||
			}
 | 
			
		||||
		case *api.Status:
 | 
			
		||||
			if stat.Code != 0 {
 | 
			
		||||
				status = stat.Code
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		writeJSON(status, h.codec, obj, w)
 | 
			
		||||
	} else {
 | 
			
		||||
		writeJSON(http.StatusAccepted, h.codec, obj, w)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -41,7 +41,7 @@ var watchTestTable = []struct {
 | 
			
		||||
func TestWatchWebsocket(t *testing.T) {
 | 
			
		||||
	simpleStorage := &SimpleRESTStorage{}
 | 
			
		||||
	_ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work.
 | 
			
		||||
	handler := New(map[string]RESTStorage{
 | 
			
		||||
	handler := Handle(map[string]RESTStorage{
 | 
			
		||||
		"foo": simpleStorage,
 | 
			
		||||
	}, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
@@ -87,7 +87,7 @@ func TestWatchWebsocket(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
func TestWatchHTTP(t *testing.T) {
 | 
			
		||||
	simpleStorage := &SimpleRESTStorage{}
 | 
			
		||||
	handler := New(map[string]RESTStorage{
 | 
			
		||||
	handler := Handle(map[string]RESTStorage{
 | 
			
		||||
		"foo": simpleStorage,
 | 
			
		||||
	}, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
@@ -144,7 +144,7 @@ func TestWatchHTTP(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
func TestWatchParamParsing(t *testing.T) {
 | 
			
		||||
	simpleStorage := &SimpleRESTStorage{}
 | 
			
		||||
	handler := New(map[string]RESTStorage{
 | 
			
		||||
	handler := Handle(map[string]RESTStorage{
 | 
			
		||||
		"foo": simpleStorage,
 | 
			
		||||
	}, codec, "/prefix/version")
 | 
			
		||||
	server := httptest.NewServer(handler)
 | 
			
		||||
 
 | 
			
		||||
@@ -20,6 +20,11 @@ import (
 | 
			
		||||
	"net/http"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// mux is an interface describing the methods InstallHandler requires.
 | 
			
		||||
type mux interface {
 | 
			
		||||
	HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	http.HandleFunc("/healthz", handleHealthz)
 | 
			
		||||
}
 | 
			
		||||
@@ -31,6 +36,6 @@ func handleHealthz(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// InstallHandler registers a handler for health checking on the path "/healthz" to mux.
 | 
			
		||||
func InstallHandler(mux *http.ServeMux) {
 | 
			
		||||
func InstallHandler(mux mux) {
 | 
			
		||||
	mux.HandleFunc("/healthz", handleHealthz)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -139,21 +139,11 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Run begins serving the Kubernetes API. It never returns.
 | 
			
		||||
func (m *Master) Run(myAddress, apiPrefix string) error {
 | 
			
		||||
	s := &http.Server{
 | 
			
		||||
		Addr:           myAddress,
 | 
			
		||||
		Handler:        m.ConstructHandler(apiPrefix),
 | 
			
		||||
		ReadTimeout:    10 * time.Second,
 | 
			
		||||
		WriteTimeout:   10 * time.Second,
 | 
			
		||||
		MaxHeaderBytes: 1 << 20,
 | 
			
		||||
// API_v1beta1 returns the resources and codec for API version v1beta1
 | 
			
		||||
func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, apiserver.Codec) {
 | 
			
		||||
	storage := make(map[string]apiserver.RESTStorage)
 | 
			
		||||
	for k, v := range m.storage {
 | 
			
		||||
		storage[k] = v
 | 
			
		||||
	}
 | 
			
		||||
	return s.ListenAndServe()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ConstructHandler returns an http.Handler which serves the Kubernetes API.
 | 
			
		||||
// Instead of calling Run, you can call this function to get a handler for your own server.
 | 
			
		||||
// It is intended for testing. Only call once.
 | 
			
		||||
func (m *Master) ConstructHandler(apiPrefix string) http.Handler {
 | 
			
		||||
	return apiserver.New(m.storage, api.Codec, apiPrefix)
 | 
			
		||||
	return storage, api.Codec
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user