Merge pull request #42911 from deads2k/server-04-combined

Automatic merge from submit-queue (batch tested with PRs 43694, 41262, 42911)

combine kube-apiserver and kube-aggregator

This combines several pulls currently in progress and wires them together.  The aggregator sits in front of the normal kube-apiserver and allows local fallthrough instead of proxying.

@kubernetes/sig-api-machinery-misc 
@DirectXMan12 since you seem invested, your life will get easier
@luxas FYI since you've started trying to wire something together.  



Dependent Pulls LGTM:
- [x] https://github.com/kubernetes/kubernetes/pull/42801
- [x] https://github.com/kubernetes/kubernetes/pull/42886
- [x] https://github.com/kubernetes/kubernetes/pull/42900
- [x] https://github.com/kubernetes/kubernetes/pull/42732
- [x] https://github.com/kubernetes/kubernetes/pull/42672
- [x] https://github.com/kubernetes/kubernetes/pull/43141
- [x] https://github.com/kubernetes/kubernetes/pull/43076
- [x] https://github.com/kubernetes/kubernetes/pull/43149
- [x] https://github.com/kubernetes/kubernetes/pull/43226
- [x] https://github.com/kubernetes/kubernetes/pull/43144
This commit is contained in:
Kubernetes Submit Queue
2017-03-27 09:30:24 -07:00
committed by GitHub
28 changed files with 18558 additions and 84 deletions

View File

@@ -17,6 +17,7 @@ limitations under the License.
package main
import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
)
@@ -32,7 +33,7 @@ func NewKubeAPIServer() *Server {
SimpleUsage: "apiserver",
Long: "The main API entrypoint and interface to the storage system. The API server is also the focal point for all authorization decisions.",
Run: func(_ *Server, args []string) error {
return app.Run(s)
return app.Run(s, wait.NeverStop)
},
}
s.AddFlags(hks.Flags())

View File

@@ -25,6 +25,7 @@ go_library(
"//pkg/version/prometheus:go_default_library",
"//pkg/version/verflag:go_default_library",
"//vendor:github.com/spf13/pflag",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apiserver/pkg/util/flag",
"//vendor:k8s.io/apiserver/pkg/util/logs",
],

View File

@@ -24,6 +24,7 @@ import (
"os"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/flag"
"k8s.io/apiserver/pkg/util/logs"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
@@ -47,7 +48,7 @@ func main() {
verflag.PrintAndExitIfRequested()
if err := app.Run(s); err != nil {
if err := app.Run(s, wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}

View File

@@ -10,6 +10,7 @@ load(
go_library(
name = "go_default_library",
srcs = [
"aggregator.go",
"plugins.go",
"server.go",
],
@@ -32,6 +33,7 @@ go_library(
"//pkg/kubeapiserver/admission:go_default_library",
"//pkg/kubeapiserver/authenticator:go_default_library",
"//pkg/master:go_default_library",
"//pkg/master/thirdparty:go_default_library",
"//pkg/master/tunneler:go_default_library",
"//pkg/registry/cachesize:go_default_library",
"//pkg/version:go_default_library",
@@ -62,6 +64,7 @@ go_library(
"//vendor:github.com/spf13/cobra",
"//vendor:github.com/spf13/pflag",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/openapi",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/util/errors",
@@ -73,7 +76,15 @@ go_library(
"//vendor:k8s.io/apiserver/pkg/authorization/authorizer",
"//vendor:k8s.io/apiserver/pkg/server",
"//vendor:k8s.io/apiserver/pkg/server/filters",
"//vendor:k8s.io/apiserver/pkg/server/healthz",
"//vendor:k8s.io/apiserver/pkg/server/mux",
"//vendor:k8s.io/apiserver/pkg/server/options",
"//vendor:k8s.io/apiserver/pkg/server/storage",
"//vendor:k8s.io/client-go/kubernetes",
"//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration",
"//vendor:k8s.io/kube-aggregator/pkg/apiserver",
"//vendor:k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion",
"//vendor:k8s.io/kube-aggregator/pkg/controllers/autoregister",
],
)

View File

@@ -0,0 +1,189 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app does all of the work necessary to create a Kubernetes
// APIServer by binding together the API, master and APIServer infrastructure.
// It can be configured and called directly or via the hyperkube framework.
package app
import (
"fmt"
"io/ioutil"
"net/http"
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux"
genericoptions "k8s.io/apiserver/pkg/server/options"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion"
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
"k8s.io/kubernetes/pkg/master/thirdparty"
)
func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, commandOptions *options.ServerRunOptions) (*aggregatorapiserver.Config, error) {
// make a shallow copy to let us twiddle a few things
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator
genericConfig := kubeAPIServerConfig
genericConfig.FallThroughHandler = mux.NewPathRecorderMux()
// the aggregator doesn't wire these up. It just delegates them to the kubeapiserver
genericConfig.EnableSwaggerUI = false
genericConfig.OpenAPIConfig = nil
genericConfig.SwaggerConfig = nil
// copy the loopbackclientconfig. We're going to change the contenttype back to json until we get protobuf serializations for it
t := *kubeAPIServerConfig.LoopbackClientConfig
genericConfig.LoopbackClientConfig = &t
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = ""
// copy the etcd options so we don't mutate originals.
etcdOptions := *commandOptions.Etcd
etcdOptions.StorageConfig.Codec = aggregatorapiserver.Codecs.LegacyCodec(schema.GroupVersion{Group: "apiregistration.k8s.io", Version: "v1alpha1"})
etcdOptions.StorageConfig.Copier = aggregatorapiserver.Scheme
genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}
client, err := kubeclientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}
var certBytes, keyBytes []byte
if len(commandOptions.ProxyClientCertFile) > 0 && len(commandOptions.ProxyClientKeyFile) > 0 {
certBytes, err = ioutil.ReadFile(commandOptions.ProxyClientCertFile)
if err != nil {
return nil, err
}
keyBytes, err = ioutil.ReadFile(commandOptions.ProxyClientKeyFile)
if err != nil {
return nil, err
}
}
aggregatorConfig := &aggregatorapiserver.Config{
GenericConfig: &genericConfig,
CoreAPIServerClient: client,
ProxyClientCert: certBytes,
ProxyClientKey: keyBytes,
}
return aggregatorConfig, nil
}
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer, stopCh)
if err != nil {
return nil, err
}
// create controllers for auto-registration
apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}
autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices(), apiRegistrationClient)
apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
tprRegistrationController := thirdparty.NewAutoRegistrationController(sharedInformers.Extensions().InternalVersion().ThirdPartyResources(), autoRegistrationController)
aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
go autoRegistrationController.Run(5, stopCh)
go tprRegistrationController.Run(5, stopCh)
return nil
})
aggregatorServer.GenericAPIServer.AddHealthzChecks(healthz.NamedCheck("autoregister-completion", func(r *http.Request) error {
items, err := aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices().Lister().List(labels.Everything())
if err != nil {
return err
}
missing := []apiregistration.APIService{}
for _, apiService := range apiServices {
found := false
for _, item := range items {
if item.Name == apiService.Name {
found = true
break
}
}
if !found {
missing = append(missing, *apiService)
}
}
if len(missing) > 0 {
return fmt.Errorf("missing APIService: %v", missing)
}
return nil
}))
return aggregatorServer, nil
}
func makeAPIService(gv schema.GroupVersion) *apiregistration.APIService {
return &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group},
Spec: apiregistration.APIServiceSpec{
Group: gv.Group,
Version: gv.Version,
Priority: 100,
},
}
}
func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, registration autoregister.AutoAPIServiceRegistration) []*apiregistration.APIService {
apiServices := []*apiregistration.APIService{}
for _, curr := range delegateAPIServer.ListedPaths() {
if curr == "/api/v1" {
apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"})
registration.AddAPIServiceToSync(apiService)
apiServices = append(apiServices, apiService)
continue
}
if !strings.HasPrefix(curr, "/apis/") {
continue
}
// this comes back in a list that looks like /apis/rbac.authorization.k8s.io/v1alpha1
tokens := strings.Split(curr, "/")
if len(tokens) != 4 {
continue
}
apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]})
// TODO this is probably an indication that we need explicit and precise control over the discovery chain
// but for now its a special case
// apps has to come last for compatibility with 1.5 kubectl clients
if apiService.Spec.Group == "apps" {
apiService.Spec.Priority = 110
}
registration.AddAPIServiceToSync(apiService)
apiServices = append(apiServices, apiService)
}
return apiServices
}

View File

@@ -63,6 +63,9 @@ type ServerRunOptions struct {
ServiceNodePortRange utilnet.PortRange
SSHKeyfile string
SSHUser string
ProxyClientCertFile string
ProxyClientKeyFile string
}
// NewServerRunOptions creates a new ServerRunOptions object with default parameters
@@ -200,4 +203,10 @@ func (s *ServerRunOptions) AddFlags(fs *pflag.FlagSet) {
"If true, server will do its best to fix the update request to pass the validation, "+
"e.g., setting empty UID in update request to its existing value. This flag can be turned off "+
"after we fix all the clients that send malformed updates.")
fs.StringVar(&s.ProxyClientCertFile, "proxy-client-cert-file", s.ProxyClientCertFile,
"client certificate used to prove the identity of the aggragator or kube-apiserver when it proxies requests to a user api-server")
fs.StringVar(&s.ProxyClientKeyFile, "proxy-client-key-file", s.ProxyClientKeyFile,
"client certificate key used to prove the identity of the aggragator or kube-apiserver when it proxies requests to a user api-server")
}

View File

@@ -43,7 +43,6 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authentication/authenticator"
@@ -51,6 +50,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/cmd/kube-apiserver/app/preflight"
"k8s.io/kubernetes/pkg/api"
@@ -94,31 +94,52 @@ cluster's shared state through which all other components interact.`,
}
// Run runs the specified APIServer. This should never exit.
func Run(s *options.ServerRunOptions) error {
config, sharedInformers, err := BuildMasterConfig(s)
func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
kubeAPIServerConfig, sharedInformers, err := CreateKubeAPIServerConfig(runOptions)
if err != nil {
return err
}
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, sharedInformers, stopCh)
if err != nil {
return err
}
return RunServer(config, sharedInformers, wait.NeverStop)
// if we're starting up a hacked up version of this API server for a weird test case,
// just start the API server as is because clients don't get built correctly when you do this
if len(os.Getenv("KUBE_API_VERSIONS")) > 0 {
return kubeAPIServer.GenericAPIServer.PrepareRun().Run(stopCh)
}
// otherwise go down the normal path of standing the aggregator up in front of the API server
// this wires up openapi
kubeAPIServer.GenericAPIServer.PrepareRun()
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions)
if err != nil {
return err
}
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, stopCh)
if err != nil {
return err
}
return aggregatorServer.GenericAPIServer.PrepareRun().Run(stopCh)
}
// RunServer uses the provided config and shared informers to run the apiserver. It does not return.
func RunServer(config *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) error {
m, err := config.Complete().New()
// CreateKubeAPIServer creates and wires a workable kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New()
if err != nil {
return err
return nil, err
}
m.GenericAPIServer.AddPostStartHook("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error {
kubeAPIServer.GenericAPIServer.AddPostStartHook("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error {
sharedInformers.Start(stopCh)
return nil
})
return m.GenericAPIServer.PrepareRun().Run(stopCh)
return kubeAPIServer, nil
}
// BuildMasterConfig creates all the resources for running the API server, but runs none of them
func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, error) {
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, error) {
// set defaults in the options before trying to create the generic config
if err := defaultOptions(s); err != nil {
return nil, nil, err