mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	make Kubelet bootstrap certificate signal aware
This commit is contained in:
		@@ -256,12 +256,12 @@ HTTP server: The kubelet can also listen for HTTP and respond to a simple API
 | 
			
		||||
			// add the kubelet config controller to kubeletDeps
 | 
			
		||||
			kubeletDeps.KubeletConfigController = kubeletConfigController
 | 
			
		||||
 | 
			
		||||
			// set up stopCh here in order to be reused by kubelet and docker shim
 | 
			
		||||
			stopCh := genericapiserver.SetupSignalHandler()
 | 
			
		||||
			// set up signal context here in order to be reused by kubelet and docker shim
 | 
			
		||||
			ctx := genericapiserver.SetupSignalContext()
 | 
			
		||||
 | 
			
		||||
			// run the kubelet
 | 
			
		||||
			klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
 | 
			
		||||
			if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {
 | 
			
		||||
			if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
 | 
			
		||||
				klog.Fatal(err)
 | 
			
		||||
			}
 | 
			
		||||
		},
 | 
			
		||||
@@ -403,7 +403,7 @@ func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.Fea
 | 
			
		||||
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
 | 
			
		||||
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
 | 
			
		||||
// not be generated.
 | 
			
		||||
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error {
 | 
			
		||||
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
 | 
			
		||||
	logOption := logs.NewOptions()
 | 
			
		||||
	logOption.LogFormat = s.Logging.Format
 | 
			
		||||
	logOption.Apply()
 | 
			
		||||
@@ -412,7 +412,7 @@ func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
 | 
			
		||||
	if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
 | 
			
		||||
		return fmt.Errorf("failed OS init: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if err := run(s, kubeDeps, featureGate, stopCh); err != nil {
 | 
			
		||||
	if err := run(ctx, s, kubeDeps, featureGate); err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to run Kubelet: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
@@ -469,7 +469,7 @@ func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) {
 | 
			
		||||
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
 | 
			
		||||
	// Set global feature gates based on the value on the initial KubeletServer
 | 
			
		||||
	err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -552,7 +552,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
 | 
			
		||||
		klog.Warningf("standalone mode, no API client")
 | 
			
		||||
 | 
			
		||||
	case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
 | 
			
		||||
		clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
 | 
			
		||||
		clientConfig, closeAllConns, err := buildKubeletClientConfig(ctx, s, nodeName)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
@@ -597,7 +597,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		kubeDeps.Auth = auth
 | 
			
		||||
		runAuthenticatorCAReload(stopCh)
 | 
			
		||||
		runAuthenticatorCAReload(ctx.Done())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var cgroupRoots []string
 | 
			
		||||
@@ -799,7 +799,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
 | 
			
		||||
	select {
 | 
			
		||||
	case <-done:
 | 
			
		||||
		break
 | 
			
		||||
	case <-stopCh:
 | 
			
		||||
	case <-ctx.Done():
 | 
			
		||||
		break
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -808,7 +808,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
 | 
			
		||||
 | 
			
		||||
// buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether
 | 
			
		||||
// bootstrapping is enabled or client certificate rotation is enabled.
 | 
			
		||||
func buildKubeletClientConfig(s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) {
 | 
			
		||||
func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) {
 | 
			
		||||
	if s.RotateCertificates {
 | 
			
		||||
		// Rules for client rotation and the handling of kube config files:
 | 
			
		||||
		//
 | 
			
		||||
@@ -878,7 +878,7 @@ func buildKubeletClientConfig(s *options.KubeletServer, nodeName types.NodeName)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(s.BootstrapKubeconfig) > 0 {
 | 
			
		||||
		if err := bootstrap.LoadClientCert(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
 | 
			
		||||
		if err := bootstrap.LoadClientCert(ctx, s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
 | 
			
		||||
			return nil, nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -105,7 +105,7 @@ func LoadClientConfig(kubeconfigPath, bootstrapPath, certDir string) (certConfig
 | 
			
		||||
// The kubeconfig at bootstrapPath is used to request a client certificate from the API server.
 | 
			
		||||
// On success, a kubeconfig file referencing the generated key and obtained certificate is written to kubeconfigPath.
 | 
			
		||||
// The certificate and key file are stored in certDir.
 | 
			
		||||
func LoadClientCert(kubeconfigPath, bootstrapPath, certDir string, nodeName types.NodeName) error {
 | 
			
		||||
func LoadClientCert(ctx context.Context, kubeconfigPath, bootstrapPath, certDir string, nodeName types.NodeName) error {
 | 
			
		||||
	// Short-circuit if the kubeconfig file exists and is valid.
 | 
			
		||||
	ok, err := isClientConfigStillValid(kubeconfigPath)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -156,11 +156,11 @@ func LoadClientCert(kubeconfigPath, bootstrapPath, certDir string, nodeName type
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := waitForServer(*bootstrapClientConfig, 1*time.Minute); err != nil {
 | 
			
		||||
	if err := waitForServer(ctx, *bootstrapClientConfig, 1*time.Minute); err != nil {
 | 
			
		||||
		klog.Warningf("Error waiting for apiserver to come up: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	certData, err := requestNodeCertificate(bootstrapClient, keyData, nodeName)
 | 
			
		||||
	certData, err := requestNodeCertificate(ctx, bootstrapClient, keyData, nodeName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
@@ -278,7 +278,7 @@ func verifyKeyData(data []byte) bool {
 | 
			
		||||
	return err == nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func waitForServer(cfg restclient.Config, deadline time.Duration) error {
 | 
			
		||||
func waitForServer(ctx context.Context, cfg restclient.Config, deadline time.Duration) error {
 | 
			
		||||
	cfg.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
 | 
			
		||||
	cfg.Timeout = 1 * time.Second
 | 
			
		||||
	cli, err := restclient.UnversionedRESTClientFor(&cfg)
 | 
			
		||||
@@ -286,12 +286,12 @@ func waitForServer(cfg restclient.Config, deadline time.Duration) error {
 | 
			
		||||
		return fmt.Errorf("couldn't create client: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx, cancel := context.WithTimeout(context.TODO(), deadline)
 | 
			
		||||
	ctx, cancel := context.WithTimeout(ctx, deadline)
 | 
			
		||||
	defer cancel()
 | 
			
		||||
 | 
			
		||||
	var connected bool
 | 
			
		||||
	wait.JitterUntil(func() {
 | 
			
		||||
		if _, err := cli.Get().AbsPath("/healthz").Do(context.TODO()).Raw(); err != nil {
 | 
			
		||||
		if _, err := cli.Get().AbsPath("/healthz").Do(ctx).Raw(); err != nil {
 | 
			
		||||
			klog.Infof("Failed to connect to apiserver: %v", err)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
@@ -312,7 +312,7 @@ func waitForServer(cfg restclient.Config, deadline time.Duration) error {
 | 
			
		||||
// certificate (pem-encoded). If there is any errors, or the watch timeouts, it
 | 
			
		||||
// will return an error. This is intended for use on nodes (kubelet and
 | 
			
		||||
// kubeadm).
 | 
			
		||||
func requestNodeCertificate(client clientset.Interface, privateKeyData []byte, nodeName types.NodeName) (certData []byte, err error) {
 | 
			
		||||
func requestNodeCertificate(ctx context.Context, client clientset.Interface, privateKeyData []byte, nodeName types.NodeName) (certData []byte, err error) {
 | 
			
		||||
	subject := &pkix.Name{
 | 
			
		||||
		Organization: []string{"system:nodes"},
 | 
			
		||||
		CommonName:   "system:node:" + string(nodeName),
 | 
			
		||||
@@ -349,7 +349,7 @@ func requestNodeCertificate(client clientset.Interface, privateKeyData []byte, n
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx, cancel := context.WithTimeout(context.Background(), 3600*time.Second)
 | 
			
		||||
	ctx, cancel := context.WithTimeout(ctx, 3600*time.Second)
 | 
			
		||||
	defer cancel()
 | 
			
		||||
 | 
			
		||||
	klog.V(2).Infof("Waiting for client certificate to be issued")
 | 
			
		||||
 
 | 
			
		||||
@@ -17,6 +17,7 @@ limitations under the License.
 | 
			
		||||
package bootstrap
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"os"
 | 
			
		||||
@@ -95,7 +96,7 @@ users:
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestRequestNodeCertificateNoKeyData(t *testing.T) {
 | 
			
		||||
	certData, err := requestNodeCertificate(newClientset(fakeClient{}), []byte{}, "fake-node-name")
 | 
			
		||||
	certData, err := requestNodeCertificate(context.TODO(), newClientset(fakeClient{}), []byte{}, "fake-node-name")
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		t.Errorf("Got no error, wanted error an error because there was an empty private key passed in.")
 | 
			
		||||
	}
 | 
			
		||||
@@ -113,7 +114,7 @@ func TestRequestNodeCertificateErrorCreatingCSR(t *testing.T) {
 | 
			
		||||
		t.Fatalf("Unable to generate a new private key: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	certData, err := requestNodeCertificate(client, privateKeyData, "fake-node-name")
 | 
			
		||||
	certData, err := requestNodeCertificate(context.TODO(), client, privateKeyData, "fake-node-name")
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		t.Errorf("Got no error, wanted error an error because client.Create failed.")
 | 
			
		||||
	}
 | 
			
		||||
@@ -128,7 +129,7 @@ func TestRequestNodeCertificate(t *testing.T) {
 | 
			
		||||
		t.Fatalf("Unable to generate a new private key: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	certData, err := requestNodeCertificate(newClientset(fakeClient{}), privateKeyData, "fake-node-name")
 | 
			
		||||
	certData, err := requestNodeCertificate(context.TODO(), newClientset(fakeClient{}), privateKeyData, "fake-node-name")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("Got %v, wanted no error.", err)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -17,6 +17,7 @@ limitations under the License.
 | 
			
		||||
package server
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"os"
 | 
			
		||||
	"os/signal"
 | 
			
		||||
)
 | 
			
		||||
@@ -27,21 +28,30 @@ var shutdownHandler chan os.Signal
 | 
			
		||||
// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
 | 
			
		||||
// which is closed on one of these signals. If a second signal is caught, the program
 | 
			
		||||
// is terminated with exit code 1.
 | 
			
		||||
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
 | 
			
		||||
// be called once.
 | 
			
		||||
func SetupSignalHandler() <-chan struct{} {
 | 
			
		||||
	return SetupSignalContext().Done()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetupSignalContext is same as SetupSignalHandler, but a context.Context is returned.
 | 
			
		||||
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
 | 
			
		||||
// be called once.
 | 
			
		||||
func SetupSignalContext() context.Context {
 | 
			
		||||
	close(onlyOneSignalHandler) // panics when called twice
 | 
			
		||||
 | 
			
		||||
	shutdownHandler = make(chan os.Signal, 2)
 | 
			
		||||
 | 
			
		||||
	stop := make(chan struct{})
 | 
			
		||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
	signal.Notify(shutdownHandler, shutdownSignals...)
 | 
			
		||||
	go func() {
 | 
			
		||||
		<-shutdownHandler
 | 
			
		||||
		close(stop)
 | 
			
		||||
		cancel()
 | 
			
		||||
		<-shutdownHandler
 | 
			
		||||
		os.Exit(1) // second signal. Exit directly.
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return stop
 | 
			
		||||
	return ctx
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RequestShutdown emulates a received event that is considered as shutdown signal (SIGTERM/SIGINT)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user