mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-10-29 01:32:33 +00:00
physical: use permitpool from go-secure-stdlib (#29331)
* sdk/physical: use permitpool from go-secure-stdlib * physical: use permitpool from go-secure-stdlib * fixup! sdk/physical: use permitpool from go-secure-stdlib * fixup! sdk/physical: use permitpool from go-secure-stdlib
This commit is contained in:
committed by
GitHub
parent
1bfe364d65
commit
8d83c5d047
3
go.mod
3
go.mod
@@ -114,6 +114,7 @@ require (
|
||||
github.com/hashicorp/go-secure-stdlib/nonceutil v0.1.0
|
||||
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.8
|
||||
github.com/hashicorp/go-secure-stdlib/password v0.1.1
|
||||
github.com/hashicorp/go-secure-stdlib/permitpool v1.0.0
|
||||
github.com/hashicorp/go-secure-stdlib/reloadutil v0.1.1
|
||||
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2
|
||||
github.com/hashicorp/go-secure-stdlib/tlsutil v0.1.3
|
||||
@@ -204,7 +205,7 @@ require (
|
||||
github.com/sasha-s/go-deadlock v0.3.5
|
||||
github.com/sethvargo/go-limiter v0.7.1
|
||||
github.com/shirou/gopsutil/v3 v3.22.6
|
||||
github.com/stretchr/testify v1.9.0
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/tink-crypto/tink-go/v2 v2.2.0
|
||||
go.etcd.io/bbolt v1.4.0-beta.0
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.17
|
||||
|
||||
5
go.sum
5
go.sum
@@ -1478,6 +1478,8 @@ github.com/hashicorp/go-secure-stdlib/parseutil v0.1.8 h1:iBt4Ew4XEGLfh6/bPk4rSY
|
||||
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.8/go.mod h1:aiJI+PIApBRQG7FZTEBx5GiiX+HbOHilUdNxUZi4eV0=
|
||||
github.com/hashicorp/go-secure-stdlib/password v0.1.1 h1:6JzmBqXprakgFEHwBgdchsjaA9x3GyjdI568bXKxa60=
|
||||
github.com/hashicorp/go-secure-stdlib/password v0.1.1/go.mod h1:9hH302QllNwu1o2TGYtSk8I8kTAN0ca1EHpwhm5Mmzo=
|
||||
github.com/hashicorp/go-secure-stdlib/permitpool v1.0.0 h1:U6y5MXGiDVOOtkWJ6o/tu1TxABnI0yKTQWJr7z6BpNk=
|
||||
github.com/hashicorp/go-secure-stdlib/permitpool v1.0.0/go.mod h1:ecDb3o+8D4xtP0nTCufJaAVawHavy5M2eZ64Nq/8/LM=
|
||||
github.com/hashicorp/go-secure-stdlib/plugincontainer v0.4.1 h1:JY+zGg8gOmslwif1fiCqT5Hu1SikLZQcHkmQhCoA9gY=
|
||||
github.com/hashicorp/go-secure-stdlib/plugincontainer v0.4.1/go.mod h1:jW3KCTvdPyAdVecOUwiiO2XaYgUJ/isigt++ISkszkY=
|
||||
github.com/hashicorp/go-secure-stdlib/reloadutil v0.1.1 h1:SMGUnbpAcat8rIKHkBPjfv81yC46a8eCNZ2hsR2l1EI=
|
||||
@@ -2159,8 +2161,9 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
|
||||
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/tencentcloud/tencentcloud-sdk-go v1.0.162 h1:8fDzz4GuVg4skjY2B0nMN7h6uN61EDVkuLyI2+qGHhI=
|
||||
github.com/tencentcloud/tencentcloud-sdk-go v1.0.162/go.mod h1:asUz5BPXxgoPGaRgZaVm1iGcUAuHyYUo1nXqKa83cvI=
|
||||
github.com/tilinna/clock v1.0.2/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao=
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/aliyun/aliyun-oss-go-sdk/oss"
|
||||
"github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
)
|
||||
|
||||
@@ -39,7 +40,7 @@ type AliCloudOSSBackend struct {
|
||||
bucket string
|
||||
client *oss.Client
|
||||
logger log.Logger
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
}
|
||||
|
||||
// NewAliCloudOSSBackend constructs an OSS backend using a pre-existing
|
||||
@@ -113,7 +114,7 @@ func NewAliCloudOSSBackend(conf map[string]string, logger log.Logger) (physical.
|
||||
client: client,
|
||||
bucket: bucket,
|
||||
logger: logger,
|
||||
permitPool: physical.NewPermitPool(maxParInt),
|
||||
permitPool: permitpool.New(maxParInt),
|
||||
}
|
||||
return a, nil
|
||||
}
|
||||
@@ -122,7 +123,9 @@ func NewAliCloudOSSBackend(conf map[string]string, logger log.Logger) (physical.
|
||||
func (a *AliCloudOSSBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
defer metrics.MeasureSince([]string{AlibabaMetricKey, "put"}, time.Now())
|
||||
|
||||
a.permitPool.Acquire()
|
||||
if err := a.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer a.permitPool.Release()
|
||||
|
||||
bucket, err := a.client.Bucket(a.bucket)
|
||||
@@ -137,7 +140,9 @@ func (a *AliCloudOSSBackend) Put(ctx context.Context, entry *physical.Entry) err
|
||||
func (a *AliCloudOSSBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
defer metrics.MeasureSince([]string{AlibabaMetricKey, "get"}, time.Now())
|
||||
|
||||
a.permitPool.Acquire()
|
||||
if err := a.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer a.permitPool.Release()
|
||||
|
||||
bucket, err := a.client.Bucket(a.bucket)
|
||||
@@ -174,7 +179,9 @@ func (a *AliCloudOSSBackend) Get(ctx context.Context, key string) (*physical.Ent
|
||||
func (a *AliCloudOSSBackend) Delete(ctx context.Context, key string) error {
|
||||
defer metrics.MeasureSince([]string{AlibabaMetricKey, "delete"}, time.Now())
|
||||
|
||||
a.permitPool.Acquire()
|
||||
if err := a.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer a.permitPool.Release()
|
||||
|
||||
bucket, err := a.client.Bucket(a.bucket)
|
||||
@@ -190,7 +197,9 @@ func (a *AliCloudOSSBackend) Delete(ctx context.Context, key string) error {
|
||||
func (a *AliCloudOSSBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince([]string{AlibabaMetricKey, "list"}, time.Now())
|
||||
|
||||
a.permitPool.Acquire()
|
||||
if err := a.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer a.permitPool.Release()
|
||||
|
||||
keys := []string{}
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/Azure/go-autorest/autorest/azure"
|
||||
"github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/go-secure-stdlib/strutil"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
)
|
||||
@@ -37,7 +38,7 @@ const (
|
||||
type AzureBackend struct {
|
||||
container *azblob.ContainerURL
|
||||
logger log.Logger
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
}
|
||||
|
||||
// Verify AzureBackend satisfies the correct interfaces
|
||||
@@ -191,7 +192,7 @@ func NewAzureBackend(conf map[string]string, logger log.Logger) (physical.Backen
|
||||
a := &AzureBackend{
|
||||
container: &containerURL,
|
||||
logger: logger,
|
||||
permitPool: physical.NewPermitPool(maxParInt),
|
||||
permitPool: permitpool.New(maxParInt),
|
||||
}
|
||||
return a, nil
|
||||
}
|
||||
@@ -233,7 +234,9 @@ func (a *AzureBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
return fmt.Errorf("value is bigger than the current supported limit of 4MBytes")
|
||||
}
|
||||
|
||||
a.permitPool.Acquire()
|
||||
if err := a.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer a.permitPool.Release()
|
||||
|
||||
blobURL := a.container.NewBlockBlobURL(entry.Key)
|
||||
@@ -248,7 +251,9 @@ func (a *AzureBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
func (a *AzureBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
defer metrics.MeasureSince([]string{"azure", "get"}, time.Now())
|
||||
|
||||
a.permitPool.Acquire()
|
||||
if err := a.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer a.permitPool.Release()
|
||||
|
||||
blobURL := a.container.NewBlockBlobURL(key)
|
||||
@@ -285,7 +290,9 @@ func (a *AzureBackend) Get(ctx context.Context, key string) (*physical.Entry, er
|
||||
func (a *AzureBackend) Delete(ctx context.Context, key string) error {
|
||||
defer metrics.MeasureSince([]string{"azure", "delete"}, time.Now())
|
||||
|
||||
a.permitPool.Acquire()
|
||||
if err := a.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer a.permitPool.Release()
|
||||
|
||||
blobURL := a.container.NewBlockBlobURL(key)
|
||||
@@ -310,7 +317,9 @@ func (a *AzureBackend) Delete(ctx context.Context, key string) error {
|
||||
func (a *AzureBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince([]string{"azure", "list"}, time.Now())
|
||||
|
||||
a.permitPool.Acquire()
|
||||
if err := a.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer a.permitPool.Release()
|
||||
|
||||
var keys []string
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/cockroachdb/cockroach-go/v2/crdb"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/go-secure-stdlib/strutil"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
_ "github.com/jackc/pgx/v4/stdlib"
|
||||
@@ -44,7 +45,7 @@ type CockroachDBBackend struct {
|
||||
rawHAStatements map[string]string
|
||||
haStatements map[string]*sql.Stmt
|
||||
logger log.Logger
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
haEnabled bool
|
||||
}
|
||||
|
||||
@@ -142,7 +143,7 @@ func NewCockroachDBBackend(conf map[string]string, logger log.Logger) (physical.
|
||||
},
|
||||
haStatements: make(map[string]*sql.Stmt),
|
||||
logger: logger,
|
||||
permitPool: physical.NewPermitPool(maxParInt),
|
||||
permitPool: permitpool.New(maxParInt),
|
||||
haEnabled: haEnabled,
|
||||
}
|
||||
|
||||
@@ -176,7 +177,9 @@ func (c *CockroachDBBackend) prepare(statementMap map[string]*sql.Stmt, name, qu
|
||||
func (c *CockroachDBBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
defer metrics.MeasureSince([]string{"cockroachdb", "put"}, time.Now())
|
||||
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
_, err := c.statements["put"].Exec(entry.Key, entry.Value)
|
||||
@@ -190,7 +193,9 @@ func (c *CockroachDBBackend) Put(ctx context.Context, entry *physical.Entry) err
|
||||
func (c *CockroachDBBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
defer metrics.MeasureSince([]string{"cockroachdb", "get"}, time.Now())
|
||||
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
var result []byte
|
||||
@@ -213,7 +218,9 @@ func (c *CockroachDBBackend) Get(ctx context.Context, key string) (*physical.Ent
|
||||
func (c *CockroachDBBackend) Delete(ctx context.Context, key string) error {
|
||||
defer metrics.MeasureSince([]string{"cockroachdb", "delete"}, time.Now())
|
||||
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
_, err := c.statements["delete"].Exec(key)
|
||||
@@ -228,7 +235,9 @@ func (c *CockroachDBBackend) Delete(ctx context.Context, key string) error {
|
||||
func (c *CockroachDBBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince([]string{"cockroachdb", "list"}, time.Now())
|
||||
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
likePrefix := prefix + "%"
|
||||
@@ -267,7 +276,9 @@ func (c *CockroachDBBackend) Transaction(ctx context.Context, txns []*physical.T
|
||||
return nil
|
||||
}
|
||||
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
return crdb.ExecuteTx(context.Background(), c.client, nil, func(tx *sql.Tx) error {
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
package cockroachdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"sync"
|
||||
@@ -106,7 +107,9 @@ func (l *CockroachDBLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error)
|
||||
// CockroachDB table.
|
||||
func (l *CockroachDBLock) Unlock() error {
|
||||
c := l.backend
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(context.Background()); err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
if l.renewTicker != nil {
|
||||
@@ -121,7 +124,9 @@ func (l *CockroachDBLock) Unlock() error {
|
||||
// including this one, and returns the current value.
|
||||
func (l *CockroachDBLock) Value() (bool, string, error) {
|
||||
c := l.backend
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(context.Background()); err != nil {
|
||||
return false, "", err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
var result string
|
||||
err := c.haStatements["get"].QueryRow(l.key).Scan(&result)
|
||||
@@ -185,7 +190,9 @@ func (l *CockroachDBLock) periodicallyRenewLock(done chan struct{}) {
|
||||
// else has the lock, whereas non-nil means that something unexpected happened.
|
||||
func (l *CockroachDBLock) writeItem() (bool, error) {
|
||||
c := l.backend
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(context.Background()); err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
sqlResult, err := c.haStatements["upsert"].Exec(l.identity, l.key, l.value, fmt.Sprintf("%d seconds", l.ttlSeconds))
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/go-secure-stdlib/parseutil"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/go-secure-stdlib/tlsutil"
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
@@ -60,7 +61,7 @@ type ConsulBackend struct {
|
||||
path string
|
||||
kv *api.KV
|
||||
txn *api.Txn
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
consistencyMode string
|
||||
sessionTTL string
|
||||
lockWaitTime time.Duration
|
||||
@@ -161,7 +162,7 @@ func NewConsulBackend(conf map[string]string, logger log.Logger) (physical.Backe
|
||||
client: client,
|
||||
kv: client.KV(),
|
||||
txn: client.Txn(),
|
||||
permitPool: physical.NewPermitPool(maxParInt),
|
||||
permitPool: permitpool.New(maxParInt),
|
||||
consistencyMode: consistencyMode,
|
||||
sessionTTL: sessionTTL,
|
||||
lockWaitTime: lockWaitTime,
|
||||
@@ -253,7 +254,9 @@ func (c *ConsulBackend) ExpandedCapabilitiesAvailable(ctx context.Context) bool
|
||||
}}
|
||||
}
|
||||
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(ctx); err != nil {
|
||||
return false
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
queryOpts := &api.QueryOptions{}
|
||||
@@ -332,7 +335,9 @@ func (c *ConsulBackend) txnInternal(ctx context.Context, txns []*physical.TxnEnt
|
||||
ops = append(ops, o)
|
||||
}
|
||||
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
var retErr *multierror.Error
|
||||
@@ -455,7 +460,9 @@ func (c *ConsulBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
func (c *ConsulBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
defer metrics.MeasureSince([]string{"consul", "get"}, time.Now())
|
||||
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
queryOpts := &api.QueryOptions{}
|
||||
@@ -505,7 +512,9 @@ func (c *ConsulBackend) List(ctx context.Context, prefix string) ([]string, erro
|
||||
scan = scan[:len(scan)-1]
|
||||
}
|
||||
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
queryOpts := &api.QueryOptions{}
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
metrics "github.com/armon/go-metrics"
|
||||
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
)
|
||||
|
||||
@@ -26,7 +27,7 @@ import (
|
||||
type CouchDBBackend struct {
|
||||
logger log.Logger
|
||||
client *couchDBClient
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
}
|
||||
|
||||
// Verify CouchDBBackend satisfies the correct interfaces
|
||||
@@ -196,7 +197,7 @@ func buildCouchDBBackend(conf map[string]string, logger log.Logger) (*CouchDBBac
|
||||
Client: cleanhttp.DefaultPooledClient(),
|
||||
},
|
||||
logger: logger,
|
||||
permitPool: physical.NewPermitPool(maxParInt),
|
||||
permitPool: permitpool.New(maxParInt),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -213,7 +214,9 @@ type couchDBEntry struct {
|
||||
|
||||
// Put is used to insert or update an entry
|
||||
func (m *CouchDBBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
return m.PutInternal(ctx, entry)
|
||||
@@ -221,7 +224,9 @@ func (m *CouchDBBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
|
||||
// Get is used to fetch an entry
|
||||
func (m *CouchDBBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
return m.GetInternal(ctx, key)
|
||||
@@ -229,7 +234,9 @@ func (m *CouchDBBackend) Get(ctx context.Context, key string) (*physical.Entry,
|
||||
|
||||
// Delete is used to permanently delete an entry
|
||||
func (m *CouchDBBackend) Delete(ctx context.Context, key string) error {
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
return m.DeleteInternal(ctx, key)
|
||||
@@ -239,7 +246,9 @@ func (m *CouchDBBackend) Delete(ctx context.Context, key string) error {
|
||||
func (m *CouchDBBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince([]string{"couchdb", "list"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
items, err := m.client.list(prefix)
|
||||
@@ -276,7 +285,7 @@ func NewTransactionalCouchDBBackend(conf map[string]string, logger log.Logger) (
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
backend.permitPool = physical.NewPermitPool(1)
|
||||
backend.permitPool = permitpool.New(1)
|
||||
|
||||
return &TransactionalCouchDBBackend{
|
||||
CouchDBBackend: *backend,
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/awsutil"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
uuid "github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
@@ -126,7 +127,7 @@ type DynamoDBLockRecord struct {
|
||||
}
|
||||
|
||||
type PermitPoolWithMetrics struct {
|
||||
physical.PermitPool
|
||||
permitpool.Pool
|
||||
pendingPermits int32
|
||||
poolSize int
|
||||
}
|
||||
@@ -322,7 +323,9 @@ func (d *DynamoDBBackend) Put(ctx context.Context, entry *physical.Entry) error
|
||||
func (d *DynamoDBBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
defer metrics.MeasureSince([]string{"dynamodb", "get"}, time.Now())
|
||||
|
||||
d.permitPool.Acquire()
|
||||
if err := d.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer d.permitPool.Release()
|
||||
|
||||
resp, err := d.client.GetItemWithContext(ctx, &dynamodb.GetItemInput{
|
||||
@@ -438,7 +441,9 @@ func (d *DynamoDBBackend) List(ctx context.Context, prefix string) ([]string, er
|
||||
},
|
||||
}
|
||||
|
||||
d.permitPool.Acquire()
|
||||
if err := d.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer d.permitPool.Release()
|
||||
|
||||
err := d.client.QueryPagesWithContext(ctx, queryInput, func(out *dynamodb.QueryOutput, lastPage bool) bool {
|
||||
@@ -491,7 +496,9 @@ func (d *DynamoDBBackend) hasChildren(ctx context.Context, prefix string, exclud
|
||||
Limit: aws.Int64(int64(len(exclude) + 1)),
|
||||
}
|
||||
|
||||
d.permitPool.Acquire()
|
||||
if err := d.permitPool.Acquire(ctx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer d.permitPool.Release()
|
||||
|
||||
out, err := d.client.QueryWithContext(ctx, queryInput)
|
||||
@@ -547,8 +554,9 @@ func (d *DynamoDBBackend) batchWriteRequests(ctx context.Context, requests []*dy
|
||||
requests = requests[batchSize:]
|
||||
|
||||
var err error
|
||||
|
||||
d.permitPool.Acquire()
|
||||
if err := d.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
boff := backoff.NewExponentialBackOff()
|
||||
boff.MaxElapsedTime = 600 * time.Second
|
||||
@@ -996,34 +1004,38 @@ func isConditionCheckFailed(err error) bool {
|
||||
// number of permits which emits metrics
|
||||
func NewPermitPoolWithMetrics(permits int) *PermitPoolWithMetrics {
|
||||
return &PermitPoolWithMetrics{
|
||||
PermitPool: *physical.NewPermitPool(permits),
|
||||
Pool: *permitpool.New(permits),
|
||||
pendingPermits: 0,
|
||||
poolSize: permits,
|
||||
}
|
||||
}
|
||||
|
||||
// Acquire returns when a permit has been acquired
|
||||
func (c *PermitPoolWithMetrics) Acquire() {
|
||||
func (c *PermitPoolWithMetrics) Acquire(ctx context.Context) error {
|
||||
atomic.AddInt32(&c.pendingPermits, 1)
|
||||
c.emitPermitMetrics()
|
||||
c.PermitPool.Acquire()
|
||||
err := c.Pool.Acquire(ctx)
|
||||
atomic.AddInt32(&c.pendingPermits, -1)
|
||||
c.emitPermitMetrics()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Release returns a permit to the pool
|
||||
func (c *PermitPoolWithMetrics) Release() {
|
||||
c.PermitPool.Release()
|
||||
c.Pool.Release()
|
||||
c.emitPermitMetrics()
|
||||
}
|
||||
|
||||
// Get the number of requests in the permit pool
|
||||
func (c *PermitPoolWithMetrics) CurrentPermits() int {
|
||||
return c.PermitPool.CurrentPermits()
|
||||
return c.Pool.CurrentPermits()
|
||||
}
|
||||
|
||||
func (c *PermitPoolWithMetrics) emitPermitMetrics() {
|
||||
metrics.SetGauge([]string{"dynamodb", "permit_pool", "pending_permits"}, float32(c.pendingPermits))
|
||||
metrics.SetGauge([]string{"dynamodb", "permit_pool", "active_permits"}, float32(c.PermitPool.CurrentPermits()))
|
||||
metrics.SetGauge([]string{"dynamodb", "permit_pool", "active_permits"}, float32(c.Pool.CurrentPermits()))
|
||||
metrics.SetGauge([]string{"dynamodb", "permit_pool", "pool_size"}, float32(c.poolSize))
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/parseutil"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/go-secure-stdlib/strutil"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
"go.etcd.io/etcd/client/pkg/v3/transport"
|
||||
@@ -34,7 +35,7 @@ type EtcdBackend struct {
|
||||
lockTimeout time.Duration
|
||||
requestTimeout time.Duration
|
||||
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
|
||||
etcd *clientv3.Client
|
||||
}
|
||||
@@ -178,7 +179,7 @@ func newEtcd3Backend(conf map[string]string, logger log.Logger) (physical.Backen
|
||||
return &EtcdBackend{
|
||||
path: path,
|
||||
etcd: etcd,
|
||||
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
|
||||
permitPool: permitpool.New(physical.DefaultParallelOperations),
|
||||
logger: logger,
|
||||
haEnabled: haEnabledBool,
|
||||
lockTimeout: lock,
|
||||
@@ -189,7 +190,9 @@ func newEtcd3Backend(conf map[string]string, logger log.Logger) (physical.Backen
|
||||
func (c *EtcdBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
defer metrics.MeasureSince([]string{"etcd", "put"}, time.Now())
|
||||
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
|
||||
@@ -201,7 +204,9 @@ func (c *EtcdBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
func (c *EtcdBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
defer metrics.MeasureSince([]string{"etcd", "get"}, time.Now())
|
||||
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
|
||||
@@ -226,7 +231,9 @@ func (c *EtcdBackend) Get(ctx context.Context, key string) (*physical.Entry, err
|
||||
func (c *EtcdBackend) Delete(ctx context.Context, key string) error {
|
||||
defer metrics.MeasureSince([]string{"etcd", "delete"}, time.Now())
|
||||
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
|
||||
@@ -241,7 +248,9 @@ func (c *EtcdBackend) Delete(ctx context.Context, key string) error {
|
||||
func (c *EtcdBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince([]string{"etcd", "list"}, time.Now())
|
||||
|
||||
c.permitPool.Acquire()
|
||||
if err := c.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer c.permitPool.Release()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
metrics "github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/vault/helper/useragent"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
"google.golang.org/api/iterator"
|
||||
@@ -75,7 +76,7 @@ type Backend struct {
|
||||
// client is the API client and permitPool is the allowed concurrent uses of
|
||||
// the client.
|
||||
client *storage.Client
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
|
||||
// haEnabled indicates if HA is enabled.
|
||||
haEnabled bool
|
||||
@@ -171,7 +172,7 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
|
||||
bucket: bucket,
|
||||
chunkSize: chunkSize,
|
||||
client: client,
|
||||
permitPool: physical.NewPermitPool(maxParallel),
|
||||
permitPool: permitpool.New(maxParallel),
|
||||
|
||||
haEnabled: haEnabled,
|
||||
haClient: haClient,
|
||||
@@ -185,7 +186,9 @@ func (b *Backend) Put(ctx context.Context, entry *physical.Entry) (retErr error)
|
||||
defer metrics.MeasureSince(metricPut, time.Now())
|
||||
|
||||
// Pooling
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
// Insert
|
||||
@@ -211,7 +214,9 @@ func (b *Backend) Get(ctx context.Context, key string) (retEntry *physical.Entry
|
||||
defer metrics.MeasureSince(metricGet, time.Now())
|
||||
|
||||
// Pooling
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
// Read
|
||||
@@ -246,7 +251,9 @@ func (b *Backend) Delete(ctx context.Context, key string) error {
|
||||
defer metrics.MeasureSince(metricDelete, time.Now())
|
||||
|
||||
// Pooling
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
// Delete
|
||||
@@ -263,7 +270,9 @@ func (b *Backend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince(metricList, time.Now())
|
||||
|
||||
// Pooling
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
iter := b.client.Bucket(b.bucket).Objects(ctx, &storage.Query{
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
triton "github.com/joyent/triton-go"
|
||||
"github.com/joyent/triton-go/authentication"
|
||||
@@ -28,7 +29,7 @@ const mantaDefaultRootStore = "/stor"
|
||||
|
||||
type MantaBackend struct {
|
||||
logger log.Logger
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
client *storage.StorageClient
|
||||
directory string
|
||||
}
|
||||
@@ -95,7 +96,7 @@ func NewMantaBackend(conf map[string]string, logger log.Logger) (physical.Backen
|
||||
client: client,
|
||||
directory: conf["directory"],
|
||||
logger: logger,
|
||||
permitPool: physical.NewPermitPool(maxParInt),
|
||||
permitPool: permitpool.New(maxParInt),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -103,7 +104,9 @@ func NewMantaBackend(conf map[string]string, logger log.Logger) (physical.Backen
|
||||
func (m *MantaBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
defer metrics.MeasureSince([]string{"manta", "put"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
r := bytes.NewReader(entry.Value)
|
||||
@@ -121,7 +124,9 @@ func (m *MantaBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
func (m *MantaBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
defer metrics.MeasureSince([]string{"manta", "get"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
output, err := m.client.Objects().Get(ctx, &storage.GetObjectInput{
|
||||
@@ -154,7 +159,9 @@ func (m *MantaBackend) Get(ctx context.Context, key string) (*physical.Entry, er
|
||||
func (m *MantaBackend) Delete(ctx context.Context, key string) error {
|
||||
defer metrics.MeasureSince([]string{"manta", "delete"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
if strings.HasSuffix(key, "/") {
|
||||
@@ -210,7 +217,9 @@ func tryDeleteDirectory(ctx context.Context, m *MantaBackend, directoryPath stri
|
||||
func (m *MantaBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince([]string{"manta", "list"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/vault/sdk/helper/logging"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
triton "github.com/joyent/triton-go"
|
||||
@@ -57,7 +58,7 @@ func TestMantaBackend(t *testing.T) {
|
||||
client: client,
|
||||
directory: testHarnessBucket,
|
||||
logger: logger.Named("storage.mantabackend"),
|
||||
permitPool: physical.NewPermitPool(128),
|
||||
permitPool: permitpool.New(128),
|
||||
}
|
||||
|
||||
err = mb.client.Dir().Put(context.Background(), &storage.PutDirectoryInput{
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
metrics "github.com/armon/go-metrics"
|
||||
_ "github.com/denisenkom/go-mssqldb"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/go-secure-stdlib/strutil"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
)
|
||||
@@ -31,7 +32,7 @@ type MSSQLBackend struct {
|
||||
client *sql.DB
|
||||
statements map[string]*sql.Stmt
|
||||
logger log.Logger
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
}
|
||||
|
||||
func isInvalidIdentifier(name string) bool {
|
||||
@@ -171,7 +172,7 @@ func NewMSSQLBackend(conf map[string]string, logger log.Logger) (physical.Backen
|
||||
client: db,
|
||||
statements: make(map[string]*sql.Stmt),
|
||||
logger: logger,
|
||||
permitPool: physical.NewPermitPool(maxParInt),
|
||||
permitPool: permitpool.New(maxParInt),
|
||||
}
|
||||
|
||||
statements := map[string]string{
|
||||
@@ -205,7 +206,9 @@ func (m *MSSQLBackend) prepare(name, query string) error {
|
||||
func (m *MSSQLBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
defer metrics.MeasureSince([]string{"mssql", "put"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
_, err := m.statements["put"].Exec(entry.Key, entry.Value, entry.Key, entry.Key, entry.Value)
|
||||
@@ -219,7 +222,9 @@ func (m *MSSQLBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
func (m *MSSQLBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
defer metrics.MeasureSince([]string{"mssql", "get"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
var result []byte
|
||||
@@ -243,7 +248,9 @@ func (m *MSSQLBackend) Get(ctx context.Context, key string) (*physical.Entry, er
|
||||
func (m *MSSQLBackend) Delete(ctx context.Context, key string) error {
|
||||
defer metrics.MeasureSince([]string{"mssql", "delete"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
_, err := m.statements["delete"].Exec(key)
|
||||
@@ -257,7 +264,9 @@ func (m *MSSQLBackend) Delete(ctx context.Context, key string) error {
|
||||
func (m *MSSQLBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince([]string{"mssql", "list"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
likePrefix := prefix + "%"
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
mysql "github.com/go-sql-driver/mysql"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/go-secure-stdlib/strutil"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
)
|
||||
@@ -47,7 +48,7 @@ type MySQLBackend struct {
|
||||
client *sql.DB
|
||||
statements map[string]*sql.Stmt
|
||||
logger log.Logger
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
conf map[string]string
|
||||
redirectHost string
|
||||
redirectPort int64
|
||||
@@ -173,7 +174,7 @@ func NewMySQLBackend(conf map[string]string, logger log.Logger) (physical.Backen
|
||||
client: db,
|
||||
statements: make(map[string]*sql.Stmt),
|
||||
logger: logger,
|
||||
permitPool: physical.NewPermitPool(maxParInt),
|
||||
permitPool: permitpool.New(maxParInt),
|
||||
conf: conf,
|
||||
haEnabled: haEnabled,
|
||||
}
|
||||
@@ -365,7 +366,9 @@ func (m *MySQLBackend) prepare(name, query string) error {
|
||||
func (m *MySQLBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
defer metrics.MeasureSince([]string{"mysql", "put"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
_, err := m.statements["put"].Exec(entry.Key, entry.Value)
|
||||
@@ -379,7 +382,9 @@ func (m *MySQLBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
func (m *MySQLBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
defer metrics.MeasureSince([]string{"mysql", "get"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
var result []byte
|
||||
@@ -402,7 +407,9 @@ func (m *MySQLBackend) Get(ctx context.Context, key string) (*physical.Entry, er
|
||||
func (m *MySQLBackend) Delete(ctx context.Context, key string) error {
|
||||
defer metrics.MeasureSince([]string{"mysql", "delete"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
_, err := m.statements["delete"].Exec(key)
|
||||
@@ -417,7 +424,9 @@ func (m *MySQLBackend) Delete(ctx context.Context, key string) error {
|
||||
func (m *MySQLBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince([]string{"mysql", "list"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
// Add the % wildcard to the prefix to do the prefix search
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/go-secure-stdlib/strutil"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
@@ -64,7 +65,7 @@ type Backend struct {
|
||||
client *objectstorage.ObjectStorageClient
|
||||
bucketName string
|
||||
logger log.Logger
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
namespaceName string
|
||||
haEnabled bool
|
||||
lockBucketName string
|
||||
@@ -141,7 +142,7 @@ func NewBackend(conf map[string]string, logger log.Logger) (physical.Backend, er
|
||||
client: &objectStorageClient,
|
||||
bucketName: bucketName,
|
||||
logger: logger,
|
||||
permitPool: physical.NewPermitPool(MaxNumberOfPermits),
|
||||
permitPool: permitpool.New(MaxNumberOfPermits),
|
||||
namespaceName: namespaceName,
|
||||
haEnabled: haEnabled,
|
||||
lockBucketName: lockBucketName,
|
||||
@@ -153,7 +154,9 @@ func (o *Backend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
defer metrics.MeasureSince(metricPutFull, time.Now())
|
||||
startAcquirePool := time.Now()
|
||||
metrics.SetGauge(metricPermitsUsed, float32(o.permitPool.CurrentPermits()))
|
||||
o.permitPool.Acquire()
|
||||
if err := o.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer o.permitPool.Release()
|
||||
metrics.MeasureSince(metricPutAcquirePool, startAcquirePool)
|
||||
|
||||
@@ -198,7 +201,9 @@ func (o *Backend) Get(ctx context.Context, key string) (*physical.Entry, error)
|
||||
defer metrics.MeasureSince(metricGetFull, time.Now())
|
||||
metrics.SetGauge(metricPermitsUsed, float32(o.permitPool.CurrentPermits()))
|
||||
startAcquirePool := time.Now()
|
||||
o.permitPool.Acquire()
|
||||
if err := o.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer o.permitPool.Release()
|
||||
metrics.MeasureSince(metricGetAcquirePool, startAcquirePool)
|
||||
|
||||
@@ -249,7 +254,9 @@ func (o *Backend) Delete(ctx context.Context, key string) error {
|
||||
defer metrics.MeasureSince(metricDeleteFull, time.Now())
|
||||
metrics.SetGauge(metricPermitsUsed, float32(o.permitPool.CurrentPermits()))
|
||||
startAcquirePool := time.Now()
|
||||
o.permitPool.Acquire()
|
||||
if err := o.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer o.permitPool.Release()
|
||||
metrics.MeasureSince(metricDeleteAcquirePool, startAcquirePool)
|
||||
|
||||
@@ -291,7 +298,9 @@ func (o *Backend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince(metricListFull, time.Now())
|
||||
metrics.SetGauge(metricPermitsUsed, float32(o.permitPool.CurrentPermits()))
|
||||
startAcquirePool := time.Now()
|
||||
o.permitPool.Acquire()
|
||||
if err := o.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer o.permitPool.Release()
|
||||
|
||||
metrics.MeasureSince(metricListAcquirePool, startAcquirePool)
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/vault/sdk/database/helper/dbutil"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
@@ -64,7 +65,7 @@ type PostgreSQLBackend struct {
|
||||
|
||||
haEnabled bool
|
||||
logger log.Logger
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
}
|
||||
|
||||
// PostgreSQLLock implements a lock using an PostgreSQL client.
|
||||
@@ -192,7 +193,7 @@ func NewPostgreSQLBackend(conf map[string]string, logger log.Logger) (physical.B
|
||||
// $1=ha_identity $2=ha_key
|
||||
" DELETE FROM " + quoted_ha_table + " WHERE ha_identity=$1 AND ha_key=$2 ",
|
||||
logger: logger,
|
||||
permitPool: physical.NewPermitPool(maxParInt),
|
||||
permitPool: permitpool.New(maxParInt),
|
||||
haEnabled: conf["ha_enabled"] == "true",
|
||||
}
|
||||
|
||||
@@ -240,7 +241,9 @@ func (m *PostgreSQLBackend) splitKey(fullPath string) (string, string, string) {
|
||||
func (m *PostgreSQLBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
defer metrics.MeasureSince([]string{"postgres", "put"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
parentPath, path, key := m.splitKey(entry.Key)
|
||||
@@ -256,7 +259,9 @@ func (m *PostgreSQLBackend) Put(ctx context.Context, entry *physical.Entry) erro
|
||||
func (m *PostgreSQLBackend) Get(ctx context.Context, fullPath string) (*physical.Entry, error) {
|
||||
defer metrics.MeasureSince([]string{"postgres", "get"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
_, path, key := m.splitKey(fullPath)
|
||||
@@ -281,7 +286,9 @@ func (m *PostgreSQLBackend) Get(ctx context.Context, fullPath string) (*physical
|
||||
func (m *PostgreSQLBackend) Delete(ctx context.Context, fullPath string) error {
|
||||
defer metrics.MeasureSince([]string{"postgres", "delete"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
_, path, key := m.splitKey(fullPath)
|
||||
@@ -298,7 +305,9 @@ func (m *PostgreSQLBackend) Delete(ctx context.Context, fullPath string) error {
|
||||
func (m *PostgreSQLBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince([]string{"postgres", "list"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
if err := m.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer m.permitPool.Release()
|
||||
|
||||
rows, err := m.client.QueryContext(ctx, m.list_query, "/"+prefix)
|
||||
@@ -377,7 +386,9 @@ func (l *PostgreSQLLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
||||
// PostgreSQL table.
|
||||
func (l *PostgreSQLLock) Unlock() error {
|
||||
pg := l.backend
|
||||
pg.permitPool.Acquire()
|
||||
if err := pg.permitPool.Acquire(context.Background()); err != nil {
|
||||
return err
|
||||
}
|
||||
defer pg.permitPool.Release()
|
||||
|
||||
if l.renewTicker != nil {
|
||||
@@ -393,7 +404,9 @@ func (l *PostgreSQLLock) Unlock() error {
|
||||
// including this one, and returns the current value.
|
||||
func (l *PostgreSQLLock) Value() (bool, string, error) {
|
||||
pg := l.backend
|
||||
pg.permitPool.Acquire()
|
||||
if err := pg.permitPool.Acquire(context.Background()); err != nil {
|
||||
return false, "", err
|
||||
}
|
||||
defer pg.permitPool.Release()
|
||||
var result string
|
||||
err := pg.client.QueryRow(pg.haGetLockValueQuery, l.key).Scan(&result)
|
||||
@@ -453,7 +466,9 @@ func (l *PostgreSQLLock) periodicallyRenewLock(done chan struct{}) {
|
||||
// else has the lock, whereas non-nil means that something unexpected happened.
|
||||
func (l *PostgreSQLLock) writeItem() (bool, error) {
|
||||
pg := l.backend
|
||||
pg.permitPool.Acquire()
|
||||
if err := pg.permitPool.Acquire(context.Background()); err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer pg.permitPool.Release()
|
||||
|
||||
// Try steal lock or update expiry on my lock
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-raftchunking"
|
||||
"github.com/hashicorp/go-secure-stdlib/parseutil"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/go-secure-stdlib/tlsutil"
|
||||
"github.com/hashicorp/raft"
|
||||
autopilot "github.com/hashicorp/raft-autopilot"
|
||||
@@ -175,7 +176,7 @@ type RaftBackend struct {
|
||||
serverAddressProvider raft.ServerAddressProvider
|
||||
|
||||
// permitPool is used to limit the number of concurrent storage calls.
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
|
||||
// maxEntrySize imposes a size limit (in bytes) on a raft entry (put or transaction).
|
||||
// It is suggested to use a value of 2x the Raft chunking size for optimal
|
||||
@@ -632,7 +633,7 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend
|
||||
closers: closers,
|
||||
dataDir: backendConfig.Path,
|
||||
localID: backendConfig.NodeId,
|
||||
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
|
||||
permitPool: permitpool.New(physical.DefaultParallelOperations),
|
||||
maxEntrySize: backendConfig.MaxEntrySize,
|
||||
maxMountAndNamespaceEntrySize: backendConfig.MaxMountAndNamespaceTableEntrySize,
|
||||
maxBatchEntries: backendConfig.MaxBatchEntries,
|
||||
@@ -819,7 +820,9 @@ func (b *RaftBackend) applyVerifierCheckpoint() error {
|
||||
data := make([]byte, 1)
|
||||
data[0] = byte(verifierCheckpointOp)
|
||||
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(context.Background()); err != nil {
|
||||
return err
|
||||
}
|
||||
b.l.RLock()
|
||||
|
||||
var err error
|
||||
@@ -1823,7 +1826,9 @@ func (b *RaftBackend) Delete(ctx context.Context, path string) error {
|
||||
},
|
||||
},
|
||||
}
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
b.l.RLock()
|
||||
@@ -1843,7 +1848,9 @@ func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, er
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
@@ -1886,7 +1893,9 @@ func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
},
|
||||
}
|
||||
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
b.l.RLock()
|
||||
@@ -1906,7 +1915,9 @@ func (b *RaftBackend) List(ctx context.Context, prefix string) ([]string, error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
@@ -1961,7 +1972,9 @@ func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry
|
||||
command.Operations[i] = op
|
||||
}
|
||||
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
b.l.RLock()
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/awsutil"
|
||||
"github.com/hashicorp/go-secure-stdlib/parseutil"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
)
|
||||
@@ -40,7 +41,7 @@ type S3Backend struct {
|
||||
kmsKeyId string
|
||||
client *s3.S3
|
||||
logger log.Logger
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
}
|
||||
|
||||
// NewS3Backend constructs a S3 backend using a pre-existing
|
||||
@@ -157,7 +158,7 @@ func NewS3Backend(conf map[string]string, logger log.Logger) (physical.Backend,
|
||||
path: path,
|
||||
kmsKeyId: kmsKeyId,
|
||||
logger: logger,
|
||||
permitPool: physical.NewPermitPool(maxParInt),
|
||||
permitPool: permitpool.New(maxParInt),
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
@@ -166,7 +167,9 @@ func NewS3Backend(conf map[string]string, logger log.Logger) (physical.Backend,
|
||||
func (s *S3Backend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
defer metrics.MeasureSince([]string{"s3", "put"}, time.Now())
|
||||
|
||||
s.permitPool.Acquire()
|
||||
if err := s.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.permitPool.Release()
|
||||
|
||||
// Setup key
|
||||
@@ -195,7 +198,9 @@ func (s *S3Backend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
func (s *S3Backend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
defer metrics.MeasureSince([]string{"s3", "get"}, time.Now())
|
||||
|
||||
s.permitPool.Acquire()
|
||||
if err := s.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer s.permitPool.Release()
|
||||
|
||||
// Setup key
|
||||
@@ -248,7 +253,9 @@ func (s *S3Backend) Get(ctx context.Context, key string) (*physical.Entry, error
|
||||
func (s *S3Backend) Delete(ctx context.Context, key string) error {
|
||||
defer metrics.MeasureSince([]string{"s3", "delete"}, time.Now())
|
||||
|
||||
s.permitPool.Acquire()
|
||||
if err := s.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.permitPool.Release()
|
||||
|
||||
// Setup key
|
||||
@@ -270,7 +277,9 @@ func (s *S3Backend) Delete(ctx context.Context, key string) error {
|
||||
func (s *S3Backend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince([]string{"s3", "list"}, time.Now())
|
||||
|
||||
s.permitPool.Acquire()
|
||||
if err := s.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer s.permitPool.Release()
|
||||
|
||||
// Setup prefix
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"cloud.google.com/go/spanner"
|
||||
metrics "github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/go-secure-stdlib/strutil"
|
||||
"github.com/hashicorp/vault/helper/useragent"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
@@ -85,7 +86,7 @@ type Backend struct {
|
||||
// client is the API client and permitPool is the allowed concurrent uses of
|
||||
// the client.
|
||||
client *spanner.Client
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
|
||||
// haTable is the name of the table to use for HA in the database.
|
||||
haTable string
|
||||
@@ -190,7 +191,7 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
|
||||
database: database,
|
||||
table: table,
|
||||
client: client,
|
||||
permitPool: physical.NewPermitPool(maxParallel),
|
||||
permitPool: permitpool.New(maxParallel),
|
||||
|
||||
haEnabled: haEnabled,
|
||||
haTable: haTable,
|
||||
@@ -205,7 +206,9 @@ func (b *Backend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
defer metrics.MeasureSince(metricPut, time.Now())
|
||||
|
||||
// Pooling
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
// Insert
|
||||
@@ -224,7 +227,9 @@ func (b *Backend) Get(ctx context.Context, key string) (*physical.Entry, error)
|
||||
defer metrics.MeasureSince(metricGet, time.Now())
|
||||
|
||||
// Pooling
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
// Read
|
||||
@@ -252,7 +257,9 @@ func (b *Backend) Delete(ctx context.Context, key string) error {
|
||||
defer metrics.MeasureSince(metricDelete, time.Now())
|
||||
|
||||
// Pooling
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
// Delete
|
||||
@@ -269,7 +276,9 @@ func (b *Backend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince(metricList, time.Now())
|
||||
|
||||
// Pooling
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
// Sanitize
|
||||
@@ -347,7 +356,9 @@ func (b *Backend) Transaction(ctx context.Context, txns []*physical.TxnEntry) er
|
||||
}
|
||||
|
||||
// Pooling
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
// Transactivate!
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
metrics "github.com/armon/go-metrics"
|
||||
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/go-secure-stdlib/strutil"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
"github.com/ncw/swift"
|
||||
@@ -29,7 +30,7 @@ type SwiftBackend struct {
|
||||
container string
|
||||
client *swift.Connection
|
||||
logger log.Logger
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
}
|
||||
|
||||
// NewSwiftBackend constructs a Swift backend using a pre-existing
|
||||
@@ -148,7 +149,7 @@ func NewSwiftBackend(conf map[string]string, logger log.Logger) (physical.Backen
|
||||
client: &c,
|
||||
container: container,
|
||||
logger: logger,
|
||||
permitPool: physical.NewPermitPool(maxParInt),
|
||||
permitPool: permitpool.New(maxParInt),
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
@@ -157,7 +158,9 @@ func NewSwiftBackend(conf map[string]string, logger log.Logger) (physical.Backen
|
||||
func (s *SwiftBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
defer metrics.MeasureSince([]string{"swift", "put"}, time.Now())
|
||||
|
||||
s.permitPool.Acquire()
|
||||
if err := s.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.permitPool.Release()
|
||||
|
||||
err := s.client.ObjectPutBytes(s.container, entry.Key, entry.Value, "")
|
||||
@@ -172,7 +175,9 @@ func (s *SwiftBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
func (s *SwiftBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
defer metrics.MeasureSince([]string{"swift", "get"}, time.Now())
|
||||
|
||||
s.permitPool.Acquire()
|
||||
if err := s.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer s.permitPool.Release()
|
||||
|
||||
// Do a list of names with the key first since eventual consistency means
|
||||
@@ -204,7 +209,9 @@ func (s *SwiftBackend) Get(ctx context.Context, key string) (*physical.Entry, er
|
||||
func (s *SwiftBackend) Delete(ctx context.Context, key string) error {
|
||||
defer metrics.MeasureSince([]string{"swift", "delete"}, time.Now())
|
||||
|
||||
s.permitPool.Acquire()
|
||||
if err := s.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.permitPool.Release()
|
||||
|
||||
err := s.client.ObjectDelete(s.container, key)
|
||||
@@ -221,7 +228,9 @@ func (s *SwiftBackend) Delete(ctx context.Context, key string) error {
|
||||
func (s *SwiftBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince([]string{"swift", "list"}, time.Now())
|
||||
|
||||
s.permitPool.Acquire()
|
||||
if err := s.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer s.permitPool.Release()
|
||||
|
||||
list, err := s.client.ObjectNamesAll(s.container, &swift.ObjectsOpts{Prefix: prefix})
|
||||
|
||||
@@ -30,6 +30,7 @@ require (
|
||||
github.com/hashicorp/go-secure-stdlib/mlock v0.1.3
|
||||
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.8
|
||||
github.com/hashicorp/go-secure-stdlib/password v0.1.1
|
||||
github.com/hashicorp/go-secure-stdlib/permitpool v1.0.0
|
||||
github.com/hashicorp/go-secure-stdlib/plugincontainer v0.4.1
|
||||
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2
|
||||
github.com/hashicorp/go-secure-stdlib/tlsutil v0.1.3
|
||||
@@ -44,7 +45,7 @@ require (
|
||||
github.com/pierrec/lz4 v2.6.1+incompatible
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
github.com/ryanuber/go-glob v1.0.0
|
||||
github.com/stretchr/testify v1.9.0
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/tink-crypto/tink-go/v2 v2.2.0
|
||||
go.uber.org/atomic v1.11.0
|
||||
golang.org/x/crypto v0.31.0
|
||||
|
||||
@@ -200,6 +200,10 @@ github.com/hashicorp/go-secure-stdlib/parseutil v0.1.8 h1:iBt4Ew4XEGLfh6/bPk4rSY
|
||||
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.8/go.mod h1:aiJI+PIApBRQG7FZTEBx5GiiX+HbOHilUdNxUZi4eV0=
|
||||
github.com/hashicorp/go-secure-stdlib/password v0.1.1 h1:6JzmBqXprakgFEHwBgdchsjaA9x3GyjdI568bXKxa60=
|
||||
github.com/hashicorp/go-secure-stdlib/password v0.1.1/go.mod h1:9hH302QllNwu1o2TGYtSk8I8kTAN0ca1EHpwhm5Mmzo=
|
||||
github.com/hashicorp/go-secure-stdlib/permitpool v0.0.0-20250109173936-1ecdd6ad783f h1:FC0HhfmXKlQh/21KJFhGsvrOyNY1JEKRobE93wP5cKQ=
|
||||
github.com/hashicorp/go-secure-stdlib/permitpool v0.0.0-20250109173936-1ecdd6ad783f/go.mod h1:ecDb3o+8D4xtP0nTCufJaAVawHavy5M2eZ64Nq/8/LM=
|
||||
github.com/hashicorp/go-secure-stdlib/permitpool v1.0.0 h1:U6y5MXGiDVOOtkWJ6o/tu1TxABnI0yKTQWJr7z6BpNk=
|
||||
github.com/hashicorp/go-secure-stdlib/permitpool v1.0.0/go.mod h1:ecDb3o+8D4xtP0nTCufJaAVawHavy5M2eZ64Nq/8/LM=
|
||||
github.com/hashicorp/go-secure-stdlib/plugincontainer v0.4.1 h1:JY+zGg8gOmslwif1fiCqT5Hu1SikLZQcHkmQhCoA9gY=
|
||||
github.com/hashicorp/go-secure-stdlib/plugincontainer v0.4.1/go.mod h1:jW3KCTvdPyAdVecOUwiiO2XaYgUJ/isigt++ISkszkY=
|
||||
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 h1:kes8mmyCpxJsI7FTwtzRqEy9CdjCtrXrXGuOpxEA7Ts=
|
||||
@@ -444,8 +448,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
|
||||
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/tink-crypto/tink-go/v2 v2.2.0 h1:L2Da0F2Udh2agtKztdr69mV/KpnY3/lGTkMgLTVIXlA=
|
||||
github.com/tink-crypto/tink-go/v2 v2.2.0/go.mod h1:JJ6PomeNPF3cJpfWC0lgyTES6zpJILkAX0cJNwlS3xU=
|
||||
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
|
||||
"github.com/hashicorp/errwrap"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
@@ -40,7 +41,7 @@ type FileBackend struct {
|
||||
sync.RWMutex
|
||||
path string
|
||||
logger log.Logger
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
}
|
||||
|
||||
type TransactionalFileBackend struct {
|
||||
@@ -61,7 +62,7 @@ func NewFileBackend(conf map[string]string, logger log.Logger) (physical.Backend
|
||||
return &FileBackend{
|
||||
path: path,
|
||||
logger: logger,
|
||||
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
|
||||
permitPool: permitpool.New(physical.DefaultParallelOperations),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -76,13 +77,15 @@ func NewTransactionalFileBackend(conf map[string]string, logger log.Logger) (phy
|
||||
FileBackend: FileBackend{
|
||||
path: path,
|
||||
logger: logger,
|
||||
permitPool: physical.NewPermitPool(1),
|
||||
permitPool: permitpool.New(1),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (b *FileBackend) Delete(ctx context.Context, path string) error {
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
b.Lock()
|
||||
@@ -157,7 +160,9 @@ func (b *FileBackend) cleanupLogicalPath(path string) error {
|
||||
}
|
||||
|
||||
func (b *FileBackend) Get(ctx context.Context, k string) (*physical.Entry, error) {
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
b.RLock()
|
||||
@@ -216,7 +221,9 @@ func (b *FileBackend) GetInternal(ctx context.Context, k string) (*physical.Entr
|
||||
}
|
||||
|
||||
func (b *FileBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
b.Lock()
|
||||
@@ -295,7 +302,9 @@ func (b *FileBackend) PutInternal(ctx context.Context, entry *physical.Entry) er
|
||||
}
|
||||
|
||||
func (b *FileBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
b.RLock()
|
||||
@@ -376,7 +385,9 @@ func (b *FileBackend) validatePath(path string) error {
|
||||
}
|
||||
|
||||
func (b *TransactionalFileBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
|
||||
b.permitPool.Acquire()
|
||||
if err := b.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.permitPool.Release()
|
||||
|
||||
b.Lock()
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
|
||||
"github.com/armon/go-radix"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
uberAtomic "go.uber.org/atomic"
|
||||
)
|
||||
@@ -46,7 +47,7 @@ var (
|
||||
type InmemBackend struct {
|
||||
sync.RWMutex
|
||||
root *radix.Tree
|
||||
permitPool *physical.PermitPool
|
||||
permitPool *permitpool.Pool
|
||||
logger log.Logger
|
||||
failGet *uint32
|
||||
failPut *uint32
|
||||
@@ -90,7 +91,7 @@ func NewInmem(conf map[string]string, logger log.Logger) (physical.Backend, erro
|
||||
|
||||
return &InmemBackend{
|
||||
root: radix.New(),
|
||||
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
|
||||
permitPool: permitpool.New(physical.DefaultParallelOperations),
|
||||
logger: logger,
|
||||
failGet: new(uint32),
|
||||
failPut: new(uint32),
|
||||
@@ -118,7 +119,7 @@ func NewTransactionalInmem(conf map[string]string, logger log.Logger) (physical.
|
||||
return &TransactionalInmemBackend{
|
||||
InmemBackend: InmemBackend{
|
||||
root: radix.New(),
|
||||
permitPool: physical.NewPermitPool(1),
|
||||
permitPool: permitpool.New(1),
|
||||
logger: logger,
|
||||
failGet: new(uint32),
|
||||
failPut: new(uint32),
|
||||
@@ -149,7 +150,9 @@ func (i *InmemBackend) SetWriteLatency(latency time.Duration) {
|
||||
|
||||
// Put is used to insert or update an entry
|
||||
func (i *InmemBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
i.permitPool.Acquire()
|
||||
if err := i.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer i.permitPool.Release()
|
||||
|
||||
i.Lock()
|
||||
@@ -193,7 +196,9 @@ func (i *InmemBackend) FailPut(fail bool) {
|
||||
|
||||
// Get is used to fetch an entry
|
||||
func (i *InmemBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
i.permitPool.Acquire()
|
||||
if err := i.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer i.permitPool.Release()
|
||||
|
||||
i.RLock()
|
||||
@@ -243,7 +248,9 @@ func (i *InmemBackend) FailGetInTxn(fail bool) {
|
||||
|
||||
// Delete is used to permanently delete an entry
|
||||
func (i *InmemBackend) Delete(ctx context.Context, key string) error {
|
||||
i.permitPool.Acquire()
|
||||
if err := i.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer i.permitPool.Release()
|
||||
|
||||
i.Lock()
|
||||
@@ -283,7 +290,9 @@ func (i *InmemBackend) FailDelete(fail bool) {
|
||||
// List is used to list all the keys under a given
|
||||
// prefix, up to the next prefix.
|
||||
func (i *InmemBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
i.permitPool.Acquire()
|
||||
if err := i.permitPool.Acquire(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer i.permitPool.Release()
|
||||
|
||||
i.RLock()
|
||||
@@ -355,7 +364,9 @@ func (i *InmemBackend) GetMountTablePaths() []string {
|
||||
|
||||
// Transaction implements the transaction interface
|
||||
func (t *TransactionalInmemBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
|
||||
t.permitPool.Acquire()
|
||||
if err := t.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer t.permitPool.Release()
|
||||
|
||||
t.Lock()
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
radix "github.com/armon/go-radix"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
"github.com/hashicorp/vault/sdk/helper/logging"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
)
|
||||
@@ -59,7 +60,9 @@ func (f *faultyPseudo) List(ctx context.Context, prefix string) ([]string, error
|
||||
}
|
||||
|
||||
func (f *faultyPseudo) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
|
||||
f.underlying.permitPool.Acquire()
|
||||
if err := f.underlying.permitPool.Acquire(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.underlying.permitPool.Release()
|
||||
|
||||
f.underlying.Lock()
|
||||
@@ -72,7 +75,7 @@ func newFaultyPseudo(logger log.Logger, faultyPaths []string) *faultyPseudo {
|
||||
out := &faultyPseudo{
|
||||
underlying: InmemBackend{
|
||||
root: radix.New(),
|
||||
permitPool: physical.NewPermitPool(1),
|
||||
permitPool: permitpool.New(1),
|
||||
logger: logger.Named("storage.inmembackend"),
|
||||
failGet: new(uint32),
|
||||
failPut: new(uint32),
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"strings"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-secure-stdlib/permitpool"
|
||||
)
|
||||
|
||||
const DefaultParallelOperations = 128
|
||||
@@ -186,34 +187,24 @@ type Lock interface {
|
||||
type Factory func(config map[string]string, logger log.Logger) (Backend, error)
|
||||
|
||||
// PermitPool is used to limit maximum outstanding requests
|
||||
// Deprecated: use permitpool.Pool from go-secure-stdlib.
|
||||
type PermitPool struct {
|
||||
sem chan int
|
||||
*permitpool.Pool
|
||||
}
|
||||
|
||||
// NewPermitPool returns a new permit pool with the provided
|
||||
// number of permits
|
||||
// number of permits.
|
||||
// Deprecated: use permitpool.New from go-secure-stdlib.
|
||||
func NewPermitPool(permits int) *PermitPool {
|
||||
if permits < 1 {
|
||||
permits = DefaultParallelOperations
|
||||
}
|
||||
return &PermitPool{
|
||||
sem: make(chan int, permits),
|
||||
Pool: permitpool.New(permits),
|
||||
}
|
||||
}
|
||||
|
||||
// Acquire returns when a permit has been acquired
|
||||
// Deprecated: use permitpool.Acquire from go-secure-stdlib.
|
||||
func (c *PermitPool) Acquire() {
|
||||
c.sem <- 1
|
||||
}
|
||||
|
||||
// Release returns a permit to the pool
|
||||
func (c *PermitPool) Release() {
|
||||
<-c.sem
|
||||
}
|
||||
|
||||
// Get number of requests in the permit pool
|
||||
func (c *PermitPool) CurrentPermits() int {
|
||||
return len(c.sem)
|
||||
_ = c.Pool.Acquire(context.Background())
|
||||
}
|
||||
|
||||
// Prefixes is a shared helper function returns all parent 'folders' for a
|
||||
|
||||
Reference in New Issue
Block a user