mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-10-29 01:32:33 +00:00
sdk/db: do not hold the lock on Close (#29097)
* sdk/db: do not hold the lock on Close * fix missing locks on return; ensure we don't overrite instance * add type and close timeout env vars * changelog
This commit is contained in:
committed by
GitHub
parent
55ca52f3fd
commit
36d7e0c6bd
3
changelog/29097.txt
Normal file
3
changelog/29097.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
```release-note:bug
|
||||
sdk/database: Fix a bug where slow database connections can cause goroutines to be blocked.
|
||||
```
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
@@ -270,7 +272,7 @@ func deleteUserRespFromProto(rpcResp *proto.DeleteUserResponse) (DeleteUserRespo
|
||||
}
|
||||
|
||||
func (c gRPCClient) Type() (string, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
ctx, cancel := getContextWithTimeout(pluginutil.PluginGRPCTimeoutType)
|
||||
defer cancel()
|
||||
|
||||
typeResp, err := c.client.Type(ctx, &proto.Empty{})
|
||||
@@ -284,7 +286,7 @@ func (c gRPCClient) Type() (string, error) {
|
||||
}
|
||||
|
||||
func (c gRPCClient) Close() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
ctx, cancel := getContextWithTimeout(pluginutil.PluginGRPCTimeoutClose)
|
||||
defer cancel()
|
||||
|
||||
_, err := c.client.Close(ctx, &proto.Empty{})
|
||||
@@ -296,3 +298,11 @@ func (c gRPCClient) Close() error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getContextWithTimeout(env string) (context.Context, context.CancelFunc) {
|
||||
timeout := 1 // default timeout
|
||||
if envTimeout, err := strconv.Atoi(os.Getenv(env)); err == nil && envTimeout > 0 {
|
||||
timeout = envTimeout
|
||||
}
|
||||
return context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
|
||||
}
|
||||
|
||||
@@ -290,27 +290,46 @@ func (g *gRPCServer) Type(ctx context.Context, _ *proto.Empty) (*proto.TypeRespo
|
||||
|
||||
func (g *gRPCServer) Close(ctx context.Context, _ *proto.Empty) (*proto.Empty, error) {
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
|
||||
impl, err := g.getDatabaseInternal(ctx)
|
||||
if err != nil {
|
||||
g.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = impl.Close()
|
||||
if err != nil {
|
||||
return &proto.Empty{}, status.Errorf(codes.Internal, "unable to close database plugin: %s", err)
|
||||
}
|
||||
|
||||
var id string
|
||||
if g.singleImpl == nil {
|
||||
// only cleanup instances map when multiplexing is supported
|
||||
id, err := pluginutil.GetMultiplexIDFromContext(ctx)
|
||||
id, err = pluginutil.GetMultiplexIDFromContext(ctx)
|
||||
if err != nil {
|
||||
g.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
delete(g.instances, id)
|
||||
}
|
||||
|
||||
// unlock here so that the subsequent call to Close() does not hold the
|
||||
// lock in case the DB is slow to respond
|
||||
g.Unlock()
|
||||
|
||||
err = impl.Close()
|
||||
if err != nil {
|
||||
// The call to Close failed, so we will put the DB instance back in the
|
||||
// map. This might not be necessary, but we do this in case anything
|
||||
// relies on being able to retry Close.
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
if g.singleImpl == nil {
|
||||
// There is a chance that while we were calling Close another DB
|
||||
// config was created for the old ID. So we only put it back if
|
||||
// it's not set.
|
||||
if _, ok := g.instances[id]; !ok {
|
||||
g.instances[id] = impl
|
||||
}
|
||||
}
|
||||
return &proto.Empty{}, status.Errorf(codes.Internal, "unable to close database plugin: %s", err)
|
||||
}
|
||||
|
||||
return &proto.Empty{}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -551,6 +551,19 @@ func TestGRPCServer_Close(t *testing.T) {
|
||||
}
|
||||
},
|
||||
},
|
||||
"error path for multiplexed plugin": {
|
||||
db: fakeDatabase{
|
||||
closeErr: errors.New("close error"),
|
||||
},
|
||||
expectErr: true,
|
||||
expectCode: codes.Internal,
|
||||
grpcSetupFunc: testGrpcServer,
|
||||
assertFunc: func(t *testing.T, g gRPCServer) {
|
||||
if len(g.instances) != 1 {
|
||||
t.Fatalf("err expected instances map to contain exactly 1 element")
|
||||
}
|
||||
},
|
||||
},
|
||||
"happy path for non-multiplexed plugin": {
|
||||
db: fakeDatabase{},
|
||||
expectErr: false,
|
||||
|
||||
@@ -49,6 +49,14 @@ const (
|
||||
// configuration as a shim to the pgx posgtres library.
|
||||
// Deprecated: VAULT_PLUGIN_USE_POSTGRES_SSLINLINE will be removed in a future version of the Vault SDK.
|
||||
PluginUsePostgresSSLInline = "VAULT_PLUGIN_USE_POSTGRES_SSLINLINE"
|
||||
|
||||
// PluginGRPCTimeoutType is an ENV name used to set the timeout for Vault's
|
||||
// call to the plugin Type() GRPC method
|
||||
PluginGRPCTimeoutType = "VAULT_PLUGIN_GRPC_TIMEOUT_TYPE"
|
||||
|
||||
// PluginGRPCTimeoutClose is an ENV name used to set the timeout for Vault's
|
||||
// call to the plugin Close() GRPC method
|
||||
PluginGRPCTimeoutClose = "VAULT_PLUGIN_GRPC_TIMEOUT_CLOSE"
|
||||
)
|
||||
|
||||
// OptionallyEnableMlock determines if mlock should be called, and if so enables
|
||||
|
||||
Reference in New Issue
Block a user