fix: close the apid connection to other machines gracefully

Fixes #8552

When `apid` notices update in the PKI, it flushes its client connections
to other machines (used for proxying), as it might need to use new
client certificate.

While flushing, just calling `Close` might abort already running
connections.

So instead, try to close gracefully with a timeout when the connection
is idle.

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
This commit is contained in:
Andrey Smirnov
2024-04-08 19:45:44 +04:00
parent ff2c427b04
commit 336e611746

View File

@@ -15,6 +15,7 @@ import (
"github.com/siderolabs/net" "github.com/siderolabs/net"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/backoff" "google.golang.org/grpc/backoff"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@@ -26,6 +27,11 @@ import (
"github.com/siderolabs/talos/pkg/machinery/proto" "github.com/siderolabs/talos/pkg/machinery/proto"
) )
// GracefulShutdownTimeout is the timeout for graceful shutdown of the backend connection.
//
// Talos has a few long-running API calls, so we need to give the backend some time to finish them.
const GracefulShutdownTimeout = 30 * time.Minute
var _ proxy.Backend = (*APID)(nil) var _ proxy.Backend = (*APID)(nil)
// APID backend performs proxying to another apid instance. // APID backend performs proxying to another apid instance.
@@ -253,7 +259,36 @@ func (a *APID) Close() {
defer a.mu.Unlock() defer a.mu.Unlock()
if a.conn != nil { if a.conn != nil {
a.conn.Close() //nolint:errcheck gracefulGRPCClose(a.conn, GracefulShutdownTimeout)
a.conn = nil a.conn = nil
} }
} }
func gracefulGRPCClose(conn *grpc.ClientConn, timeout time.Duration) {
// close the client connection in the background, tries to avoid closing the connection
// if the connection is in the middle of a call (e.g. streaming API)
//
// see https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md for details on connection states
go func() {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for ctx.Err() != nil {
switch state := conn.GetState(); state { //nolint:exhaustive
case connectivity.Idle,
connectivity.Shutdown,
connectivity.TransientFailure:
// close immediately, connection is not used
conn.Close() //nolint:errcheck
return
default:
// wait for state change of the connection
conn.WaitForStateChange(ctx, state)
}
}
// close anyways on timeout
conn.Close() //nolint:errcheck
}()
}