mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-11-02 11:38:02 +00:00
Allow overriding gRPC's connection timeout with VAULT_GRPC_MIN_CONNECT_TIMEOUT (#19676)
This commit is contained in:
4
changelog/19676.txt
Normal file
4
changelog/19676.txt
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
```release-note:improvement
|
||||||
|
core: Allow overriding gRPC connect timeout via VAULT_GRPC_MIN_CONNECT_TIMEOUT. This is an env var rather than a config setting because we don't expect this to ever be needed. It's being added as a last-ditch
|
||||||
|
option in case all else fails for some replication issues we may not have fully reproduced.
|
||||||
|
```
|
||||||
@@ -330,7 +330,8 @@ func (c *Core) startClusterListener(ctx context.Context) error {
|
|||||||
c.clusterListener.Store(cluster.NewListener(networkLayer,
|
c.clusterListener.Store(cluster.NewListener(networkLayer,
|
||||||
c.clusterCipherSuites,
|
c.clusterCipherSuites,
|
||||||
listenerLogger,
|
listenerLogger,
|
||||||
5*c.clusterHeartbeatInterval))
|
5*c.clusterHeartbeatInterval,
|
||||||
|
c.grpcMinConnectTimeout))
|
||||||
|
|
||||||
c.AddLogger(listenerLogger)
|
c.AddLogger(listenerLogger)
|
||||||
|
|
||||||
|
|||||||
@@ -75,9 +75,10 @@ type Listener struct {
|
|||||||
logger log.Logger
|
logger log.Logger
|
||||||
l sync.RWMutex
|
l sync.RWMutex
|
||||||
tlsConnectionLoggingLevel log.Level
|
tlsConnectionLoggingLevel log.Level
|
||||||
|
grpcMinConnectTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Logger, idleTimeout time.Duration) *Listener {
|
func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Logger, idleTimeout, grpcMinConnectTimeout time.Duration) *Listener {
|
||||||
var maxStreams uint32 = math.MaxUint32
|
var maxStreams uint32 = math.MaxUint32
|
||||||
if override := os.Getenv("VAULT_GRPC_MAX_STREAMS"); override != "" {
|
if override := os.Getenv("VAULT_GRPC_MAX_STREAMS"); override != "" {
|
||||||
i, err := strconv.ParseUint(override, 10, 32)
|
i, err := strconv.ParseUint(override, 10, 32)
|
||||||
@@ -114,6 +115,7 @@ func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Lo
|
|||||||
cipherSuites: cipherSuites,
|
cipherSuites: cipherSuites,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
tlsConnectionLoggingLevel: log.LevelFromString(os.Getenv("VAULT_CLUSTER_TLS_SESSION_LOG_LEVEL")),
|
tlsConnectionLoggingLevel: log.LevelFromString(os.Getenv("VAULT_CLUSTER_TLS_SESSION_LOG_LEVEL")),
|
||||||
|
grpcMinConnectTimeout: grpcMinConnectTimeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -464,10 +466,21 @@ func (cl *Listener) GetDialerFunc(ctx context.Context, alpn string) func(string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
tlsConfig.NextProtos = []string{alpn}
|
tlsConfig.NextProtos = []string{alpn}
|
||||||
cl.logger.Debug("creating rpc dialer", "address", addr, "alpn", alpn, "host", tlsConfig.ServerName)
|
args := []interface{}{
|
||||||
|
"address", addr,
|
||||||
|
"alpn", alpn,
|
||||||
|
"host", tlsConfig.ServerName,
|
||||||
|
"timeout", fmt.Sprintf("%s", timeout),
|
||||||
|
}
|
||||||
|
if cl.grpcMinConnectTimeout != 0 {
|
||||||
|
args = append(args, "timeout_env_override", fmt.Sprintf("%s", cl.grpcMinConnectTimeout))
|
||||||
|
}
|
||||||
|
cl.logger.Debug("creating rpc dialer", args...)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
conn, err := cl.networkLayer.Dial(addr, timeout, tlsConfig)
|
conn, err := cl.networkLayer.Dial(addr, timeout, tlsConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cl.logger.Debug("dial failure", "address", addr, "alpn", alpn, "host", tlsConfig.ServerName, "duration", fmt.Sprintf("%s", time.Since(start)), "error", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
cl.logTLSSessionStart(conn.RemoteAddr().String(), conn.ConnectionState())
|
cl.logTLSSessionStart(conn.RemoteAddr().String(), conn.ConnectionState())
|
||||||
|
|||||||
@@ -132,7 +132,7 @@ func (l *InmemLayer) Dial(addr string, timeout time.Duration, tlsConfig *tls.Con
|
|||||||
if l.forceTimeout == addr {
|
if l.forceTimeout == addr {
|
||||||
l.logger.Debug("forcing timeout", "addr", addr, "me", l.addr)
|
l.logger.Debug("forcing timeout", "addr", addr, "me", l.addr)
|
||||||
|
|
||||||
// gRPC sets a deadline of 20 seconds on the dail attempt, so
|
// gRPC sets a deadline of 20 seconds on the dial attempt, so
|
||||||
// matching that here.
|
// matching that here.
|
||||||
time.Sleep(time.Second * 20)
|
time.Sleep(time.Second * 20)
|
||||||
l.l.Unlock()
|
l.l.Unlock()
|
||||||
|
|||||||
@@ -698,6 +698,9 @@ type Core struct {
|
|||||||
// if populated, the callback is called for every request
|
// if populated, the callback is called for every request
|
||||||
// for testing purposes
|
// for testing purposes
|
||||||
requestResponseCallback func(logical.Backend, *logical.Request, *logical.Response)
|
requestResponseCallback func(logical.Backend, *logical.Request, *logical.Response)
|
||||||
|
|
||||||
|
// if populated, override the default gRPC min connect timeout (currently 20s in grpc 1.51)
|
||||||
|
grpcMinConnectTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// c.stateLock needs to be held in read mode before calling this function.
|
// c.stateLock needs to be held in read mode before calling this function.
|
||||||
@@ -1286,6 +1289,16 @@ func NewCore(conf *CoreConfig) (*Core, error) {
|
|||||||
c.events.Start()
|
c.events.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
minConnectTimeoutRaw := os.Getenv("VAULT_GRPC_MIN_CONNECT_TIMEOUT")
|
||||||
|
if minConnectTimeoutRaw != "" {
|
||||||
|
dur, err := time.ParseDuration(minConnectTimeoutRaw)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Warn("VAULT_GRPC_MIN_CONNECT_TIMEOUT contains non-duration value, ignoring")
|
||||||
|
} else if dur != 0 {
|
||||||
|
c.grpcMinConnectTimeout = dur
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -278,7 +278,8 @@ func (c *Core) refreshRequestForwardingConnection(ctx context.Context, clusterAd
|
|||||||
// ALPN header right. It's just "insecure" because GRPC isn't managing
|
// ALPN header right. It's just "insecure" because GRPC isn't managing
|
||||||
// the TLS state.
|
// the TLS state.
|
||||||
dctx, cancelFunc := context.WithCancel(ctx)
|
dctx, cancelFunc := context.WithCancel(ctx)
|
||||||
c.rpcClientConn, err = grpc.DialContext(dctx, clusterURL.Host,
|
|
||||||
|
opts := []grpc.DialOption{
|
||||||
grpc.WithDialer(clusterListener.GetDialerFunc(ctx, consts.RequestForwardingALPN)),
|
grpc.WithDialer(clusterListener.GetDialerFunc(ctx, consts.RequestForwardingALPN)),
|
||||||
grpc.WithInsecure(), // it's not, we handle it in the dialer
|
grpc.WithInsecure(), // it's not, we handle it in the dialer
|
||||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||||
@@ -287,7 +288,12 @@ func (c *Core) refreshRequestForwardingConnection(ctx context.Context, clusterAd
|
|||||||
grpc.WithDefaultCallOptions(
|
grpc.WithDefaultCallOptions(
|
||||||
grpc.MaxCallRecvMsgSize(math.MaxInt32),
|
grpc.MaxCallRecvMsgSize(math.MaxInt32),
|
||||||
grpc.MaxCallSendMsgSize(math.MaxInt32),
|
grpc.MaxCallSendMsgSize(math.MaxInt32),
|
||||||
))
|
),
|
||||||
|
}
|
||||||
|
if c.grpcMinConnectTimeout != 0 {
|
||||||
|
opts = append(opts, grpc.WithConnectParams(grpc.ConnectParams{MinConnectTimeout: c.grpcMinConnectTimeout}))
|
||||||
|
}
|
||||||
|
c.rpcClientConn, err = grpc.DialContext(dctx, clusterURL.Host, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancelFunc()
|
cancelFunc()
|
||||||
c.logger.Error("err setting up forwarding rpc client", "error", err)
|
c.logger.Error("err setting up forwarding rpc client", "error", err)
|
||||||
|
|||||||
Reference in New Issue
Block a user