mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #39716 from zhouhaibing089/etcd-health-check
Automatic merge from submit-queue
etcd component status check should include credentials
- [x] Add TLS credentials into `pkg/genericapiserver.Backend`.
- [x] Add TLS credentials into `pkg/registry/core/componentstatus.Server`.
- [x] `pkg/probe/http.httpProber` should accept the TLS credentials.
Now it is working.
```console
$ kubectl get cs
NAME                 STATUS    MESSAGE              ERROR
scheduler            Healthy   ok
controller-manager   Healthy   ok
etcd-0               Healthy   {"health": "true"}
```
Fixes https://github.com/kubernetes/kubernetes/issues/27343.
			
			
This commit is contained in:
		@@ -32,7 +32,12 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func New() HTTPProber {
 | 
					func New() HTTPProber {
 | 
				
			||||||
	tlsConfig := &tls.Config{InsecureSkipVerify: true}
 | 
						tlsConfig := &tls.Config{InsecureSkipVerify: true}
 | 
				
			||||||
	transport := utilnet.SetTransportDefaults(&http.Transport{TLSClientConfig: tlsConfig, DisableKeepAlives: true})
 | 
						return NewWithTLSConfig(tlsConfig)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewWithTLSConfig takes tls config as parameter.
 | 
				
			||||||
 | 
					func NewWithTLSConfig(config *tls.Config) HTTPProber {
 | 
				
			||||||
 | 
						transport := utilnet.SetTransportDefaults(&http.Transport{TLSClientConfig: config, DisableKeepAlives: true})
 | 
				
			||||||
	return httpProber{transport}
 | 
						return httpProber{transport}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -27,19 +27,16 @@ import (
 | 
				
			|||||||
	"k8s.io/apiserver/pkg/registry/rest"
 | 
						"k8s.io/apiserver/pkg/registry/rest"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api"
 | 
						"k8s.io/kubernetes/pkg/api"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/probe"
 | 
						"k8s.io/kubernetes/pkg/probe"
 | 
				
			||||||
	httpprober "k8s.io/kubernetes/pkg/probe/http"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type REST struct {
 | 
					type REST struct {
 | 
				
			||||||
	GetServersToValidate func() map[string]Server
 | 
						GetServersToValidate func() map[string]*Server
 | 
				
			||||||
	prober               httpprober.HTTPProber
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewStorage returns a new REST.
 | 
					// NewStorage returns a new REST.
 | 
				
			||||||
func NewStorage(serverRetriever func() map[string]Server) *REST {
 | 
					func NewStorage(serverRetriever func() map[string]*Server) *REST {
 | 
				
			||||||
	return &REST{
 | 
						return &REST{
 | 
				
			||||||
		GetServersToValidate: serverRetriever,
 | 
							GetServersToValidate: serverRetriever,
 | 
				
			||||||
		prober:               httpprober.New(),
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -60,7 +57,7 @@ func (rs *REST) List(ctx genericapirequest.Context, options *metainternalversion
 | 
				
			|||||||
	wait.Add(len(servers))
 | 
						wait.Add(len(servers))
 | 
				
			||||||
	statuses := make(chan api.ComponentStatus, len(servers))
 | 
						statuses := make(chan api.ComponentStatus, len(servers))
 | 
				
			||||||
	for k, v := range servers {
 | 
						for k, v := range servers {
 | 
				
			||||||
		go func(name string, server Server) {
 | 
							go func(name string, server *Server) {
 | 
				
			||||||
			defer wait.Done()
 | 
								defer wait.Done()
 | 
				
			||||||
			status := rs.getComponentStatus(name, server)
 | 
								status := rs.getComponentStatus(name, server)
 | 
				
			||||||
			statuses <- *status
 | 
								statuses <- *status
 | 
				
			||||||
@@ -97,8 +94,8 @@ func ToConditionStatus(s probe.Result) api.ConditionStatus {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (rs *REST) getComponentStatus(name string, server Server) *api.ComponentStatus {
 | 
					func (rs *REST) getComponentStatus(name string, server *Server) *api.ComponentStatus {
 | 
				
			||||||
	status, msg, err := server.DoServerCheck(rs.prober)
 | 
						status, msg, err := server.DoServerCheck()
 | 
				
			||||||
	errorMsg := ""
 | 
						errorMsg := ""
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		errorMsg = err.Error()
 | 
							errorMsg = err.Error()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -50,17 +50,17 @@ type testResponse struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewTestREST(resp testResponse) *REST {
 | 
					func NewTestREST(resp testResponse) *REST {
 | 
				
			||||||
 | 
						prober := &fakeHttpProber{
 | 
				
			||||||
 | 
							result: resp.result,
 | 
				
			||||||
 | 
							body:   resp.data,
 | 
				
			||||||
 | 
							err:    resp.err,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	return &REST{
 | 
						return &REST{
 | 
				
			||||||
		GetServersToValidate: func() map[string]Server {
 | 
							GetServersToValidate: func() map[string]*Server {
 | 
				
			||||||
			return map[string]Server{
 | 
								return map[string]*Server{
 | 
				
			||||||
				"test1": {Addr: "testserver1", Port: 8000, Path: "/healthz"},
 | 
									"test1": {Addr: "testserver1", Port: 8000, Path: "/healthz", Prober: prober},
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		prober: &fakeHttpProber{
 | 
					 | 
				
			||||||
			result: resp.result,
 | 
					 | 
				
			||||||
			body:   resp.data,
 | 
					 | 
				
			||||||
			err:    resp.err,
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,8 +17,9 @@ limitations under the License.
 | 
				
			|||||||
package componentstatus
 | 
					package componentstatus
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"crypto/tls"
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	utilnet "k8s.io/apimachinery/pkg/util/net"
 | 
						utilnet "k8s.io/apimachinery/pkg/util/net"
 | 
				
			||||||
@@ -42,7 +43,10 @@ type Server struct {
 | 
				
			|||||||
	Port        int
 | 
						Port        int
 | 
				
			||||||
	Path        string
 | 
						Path        string
 | 
				
			||||||
	EnableHTTPS bool
 | 
						EnableHTTPS bool
 | 
				
			||||||
 | 
						TLSConfig   *tls.Config
 | 
				
			||||||
	Validate    ValidatorFn
 | 
						Validate    ValidatorFn
 | 
				
			||||||
 | 
						Prober      httpprober.HTTPProber
 | 
				
			||||||
 | 
						Once        sync.Once
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type ServerStatus struct {
 | 
					type ServerStatus struct {
 | 
				
			||||||
@@ -58,14 +62,22 @@ type ServerStatus struct {
 | 
				
			|||||||
	Err string `json:"err,omitempty"`
 | 
						Err string `json:"err,omitempty"`
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (server *Server) DoServerCheck(prober httpprober.HTTPProber) (probe.Result, string, error) {
 | 
					func (server *Server) DoServerCheck() (probe.Result, string, error) {
 | 
				
			||||||
 | 
						// setup the prober
 | 
				
			||||||
 | 
						server.Once.Do(func() {
 | 
				
			||||||
 | 
							if server.Prober != nil {
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							server.Prober = httpprober.NewWithTLSConfig(server.TLSConfig)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	scheme := "http"
 | 
						scheme := "http"
 | 
				
			||||||
	if server.EnableHTTPS {
 | 
						if server.EnableHTTPS {
 | 
				
			||||||
		scheme = "https"
 | 
							scheme = "https"
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	url := utilnet.FormatURL(scheme, server.Addr, server.Port, server.Path)
 | 
						url := utilnet.FormatURL(scheme, server.Addr, server.Port, server.Path)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	result, data, err := prober.Probe(url, nil, probeTimeOut)
 | 
						result, data, err := server.Prober.Probe(url, nil, probeTimeOut)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return probe.Unknown, "", err
 | 
							return probe.Unknown, "", err
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -59,7 +59,8 @@ func TestValidate(t *testing.T) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		s.Validate = test.validator
 | 
							s.Validate = test.validator
 | 
				
			||||||
		result, data, err := s.DoServerCheck(fakeProber)
 | 
							s.Prober = fakeProber
 | 
				
			||||||
 | 
							result, data, err := s.DoServerCheck()
 | 
				
			||||||
		if test.expectErr && err == nil {
 | 
							if test.expectErr && err == nil {
 | 
				
			||||||
			t.Error("unexpected non-error")
 | 
								t.Error("unexpected non-error")
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -15,6 +15,7 @@ go_test(
 | 
				
			|||||||
    tags = ["automanaged"],
 | 
					    tags = ["automanaged"],
 | 
				
			||||||
    deps = [
 | 
					    deps = [
 | 
				
			||||||
        "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
 | 
					        "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
 | 
				
			||||||
 | 
					        "//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
 | 
				
			||||||
    ],
 | 
					    ],
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -242,14 +242,14 @@ type componentStatusStorage struct {
 | 
				
			|||||||
	storageFactory serverstorage.StorageFactory
 | 
						storageFactory serverstorage.StorageFactory
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s componentStatusStorage) serversToValidate() map[string]componentstatus.Server {
 | 
					func (s componentStatusStorage) serversToValidate() map[string]*componentstatus.Server {
 | 
				
			||||||
	serversToValidate := map[string]componentstatus.Server{
 | 
						serversToValidate := map[string]*componentstatus.Server{
 | 
				
			||||||
		"controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"},
 | 
							"controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"},
 | 
				
			||||||
		"scheduler":          {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
 | 
							"scheduler":          {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for ix, machine := range s.storageFactory.Backends() {
 | 
						for ix, machine := range s.storageFactory.Backends() {
 | 
				
			||||||
		etcdUrl, err := url.Parse(machine)
 | 
							etcdUrl, err := url.Parse(machine.Server)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			glog.Errorf("Failed to parse etcd url for validation: %v", err)
 | 
								glog.Errorf("Failed to parse etcd url for validation: %v", err)
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
@@ -269,9 +269,10 @@ func (s componentStatusStorage) serversToValidate() map[string]componentstatus.S
 | 
				
			|||||||
			port = 2379
 | 
								port = 2379
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		// TODO: etcd health checking should be abstracted in the storage tier
 | 
							// TODO: etcd health checking should be abstracted in the storage tier
 | 
				
			||||||
		serversToValidate[fmt.Sprintf("etcd-%d", ix)] = componentstatus.Server{
 | 
							serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.Server{
 | 
				
			||||||
			Addr:        addr,
 | 
								Addr:        addr,
 | 
				
			||||||
			EnableHTTPS: etcdUrl.Scheme == "https",
 | 
								EnableHTTPS: etcdUrl.Scheme == "https",
 | 
				
			||||||
 | 
								TLSConfig:   machine.TLSConfig,
 | 
				
			||||||
			Port:        port,
 | 
								Port:        port,
 | 
				
			||||||
			Path:        "/health",
 | 
								Path:        "/health",
 | 
				
			||||||
			Validate:    etcdutil.EtcdHealthCheck,
 | 
								Validate:    etcdutil.EtcdHealthCheck,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -20,6 +20,7 @@ import (
 | 
				
			|||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
						"k8s.io/apimachinery/pkg/runtime/schema"
 | 
				
			||||||
 | 
						"k8s.io/apiserver/pkg/server/storage"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/storage/storagebackend"
 | 
						"k8s.io/apiserver/pkg/storage/storagebackend"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -47,6 +48,6 @@ func (f fakeStorageFactory) ResourcePrefix(groupResource schema.GroupResource) s
 | 
				
			|||||||
	return ""
 | 
						return ""
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (f fakeStorageFactory) Backends() []string {
 | 
					func (f fakeStorageFactory) Backends() []storage.Backend {
 | 
				
			||||||
	return []string{"etcd-0"}
 | 
						return []storage.Backend{{Server: "etcd-0"}}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,6 +17,9 @@ limitations under the License.
 | 
				
			|||||||
package storage
 | 
					package storage
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"crypto/tls"
 | 
				
			||||||
 | 
						"crypto/x509"
 | 
				
			||||||
 | 
						"io/ioutil"
 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/golang/glog"
 | 
						"github.com/golang/glog"
 | 
				
			||||||
@@ -27,6 +30,15 @@ import (
 | 
				
			|||||||
	"k8s.io/apiserver/pkg/storage/storagebackend"
 | 
						"k8s.io/apiserver/pkg/storage/storagebackend"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Backend describes the storage servers, the information here should be enough
 | 
				
			||||||
 | 
					// for health validations.
 | 
				
			||||||
 | 
					type Backend struct {
 | 
				
			||||||
 | 
						// the url of storage backend like: https://etcd.domain:2379
 | 
				
			||||||
 | 
						Server string
 | 
				
			||||||
 | 
						// the required tls config
 | 
				
			||||||
 | 
						TLSConfig *tls.Config
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// StorageFactory is the interface to locate the storage for a given GroupResource
 | 
					// StorageFactory is the interface to locate the storage for a given GroupResource
 | 
				
			||||||
type StorageFactory interface {
 | 
					type StorageFactory interface {
 | 
				
			||||||
	// New finds the storage destination for the given group and resource. It will
 | 
						// New finds the storage destination for the given group and resource. It will
 | 
				
			||||||
@@ -40,7 +52,7 @@ type StorageFactory interface {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// Backends gets all backends for all registered storage destinations.
 | 
						// Backends gets all backends for all registered storage destinations.
 | 
				
			||||||
	// Used for getting all instances for health validations.
 | 
						// Used for getting all instances for health validations.
 | 
				
			||||||
	Backends() []string
 | 
						Backends() []Backend
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// DefaultStorageFactory takes a GroupResource and returns back its storage interface.  This result includes:
 | 
					// DefaultStorageFactory takes a GroupResource and returns back its storage interface.  This result includes:
 | 
				
			||||||
@@ -252,15 +264,45 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*
 | 
				
			|||||||
	return &storageConfig, nil
 | 
						return &storageConfig, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Get all backends for all registered storage destinations.
 | 
					// Backends returns all backends for all registered storage destinations.
 | 
				
			||||||
// Used for getting all instances for health validations.
 | 
					// Used for getting all instances for health validations.
 | 
				
			||||||
func (s *DefaultStorageFactory) Backends() []string {
 | 
					func (s *DefaultStorageFactory) Backends() []Backend {
 | 
				
			||||||
	backends := sets.NewString(s.StorageConfig.ServerList...)
 | 
						servers := sets.NewString(s.StorageConfig.ServerList...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, overrides := range s.Overrides {
 | 
						for _, overrides := range s.Overrides {
 | 
				
			||||||
		backends.Insert(overrides.etcdLocation...)
 | 
							servers.Insert(overrides.etcdLocation...)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return backends.List()
 | 
					
 | 
				
			||||||
 | 
						tlsConfig := &tls.Config{
 | 
				
			||||||
 | 
							InsecureSkipVerify: true,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(s.StorageConfig.CertFile) > 0 && len(s.StorageConfig.KeyFile) > 0 {
 | 
				
			||||||
 | 
							cert, err := tls.LoadX509KeyPair(s.StorageConfig.CertFile, s.StorageConfig.KeyFile)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								glog.Errorf("failed to load key pair while getting backends: %s", err)
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								tlsConfig.Certificates = []tls.Certificate{cert}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(s.StorageConfig.CAFile) > 0 {
 | 
				
			||||||
 | 
							if caCert, err := ioutil.ReadFile(s.StorageConfig.CAFile); err != nil {
 | 
				
			||||||
 | 
								glog.Errorf("failed to read ca file while getting backends: %s", err)
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								caPool := x509.NewCertPool()
 | 
				
			||||||
 | 
								caPool.AppendCertsFromPEM(caCert)
 | 
				
			||||||
 | 
								tlsConfig.RootCAs = caPool
 | 
				
			||||||
 | 
								tlsConfig.InsecureSkipVerify = false
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						backends := []Backend{}
 | 
				
			||||||
 | 
						for server := range servers {
 | 
				
			||||||
 | 
							backends = append(backends, Backend{
 | 
				
			||||||
 | 
								Server:    server,
 | 
				
			||||||
 | 
								TLSConfig: tlsConfig,
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return backends
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *DefaultStorageFactory) ResourcePrefix(groupResource schema.GroupResource) string {
 | 
					func (s *DefaultStorageFactory) ResourcePrefix(groupResource schema.GroupResource) string {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user