mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-10-30 02:02:43 +00:00
Add support for larger transactions in Raft (#24991)
* Add support for larger transactions in Raft * Add CHANGELOG * Appease the new lint rules
This commit is contained in:
3
changelog/24991.txt
Normal file
3
changelog/24991.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
```release-note:improvement
|
||||
storage/raft: Add support for larger transactions when using raft storage.
|
||||
```
|
||||
@@ -41,10 +41,11 @@ const (
|
||||
|
||||
// Verify ConsulBackend satisfies the correct interfaces
|
||||
var (
|
||||
_ physical.Backend = (*ConsulBackend)(nil)
|
||||
_ physical.FencingHABackend = (*ConsulBackend)(nil)
|
||||
_ physical.Lock = (*ConsulLock)(nil)
|
||||
_ physical.Transactional = (*ConsulBackend)(nil)
|
||||
_ physical.Backend = (*ConsulBackend)(nil)
|
||||
_ physical.FencingHABackend = (*ConsulBackend)(nil)
|
||||
_ physical.Lock = (*ConsulLock)(nil)
|
||||
_ physical.Transactional = (*ConsulBackend)(nil)
|
||||
_ physical.TransactionalLimits = (*ConsulBackend)(nil)
|
||||
|
||||
GetInTxnDisabledError = errors.New("get operations inside transactions are disabled in consul backend")
|
||||
)
|
||||
@@ -430,6 +431,15 @@ func (c *ConsulBackend) makeApiTxn(txn *physical.TxnEntry) (*api.TxnOp, error) {
|
||||
return &api.TxnOp{KV: op}, nil
|
||||
}
|
||||
|
||||
func (c *ConsulBackend) TransactionLimits() (int, int) {
|
||||
// Note that even for modern Consul versions that support 128 entries per txn,
|
||||
// we have an effective limit of 64 write operations because the other 64 are
|
||||
// used for undo log read operations. We also reserve 1 for a check-session
|
||||
// operation to prevent split brain so the most we allow WAL to put in a batch
|
||||
// is 63.
|
||||
return 63, 128 * 1024
|
||||
}
|
||||
|
||||
// Put is used to insert or update an entry
|
||||
func (c *ConsulBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
txns := []*physical.TxnEntry{
|
||||
|
||||
@@ -159,6 +159,10 @@ func TestConsul_newConsulBackend(t *testing.T) {
|
||||
// if test.max_parallel != cap(c.permitPool) {
|
||||
// t.Errorf("bad: %v != %v", test.max_parallel, cap(c.permitPool))
|
||||
// }
|
||||
|
||||
maxEntries, maxBytes := be.(physical.TransactionalLimits).TransactionLimits()
|
||||
require.Equal(t, 63, maxEntries)
|
||||
require.Equal(t, 128*1024, maxBytes)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -52,16 +52,36 @@ const (
|
||||
// EnvVaultRaftNonVoter is used to override the non_voter config option, telling Vault to join as a non-voter (i.e. read replica).
|
||||
EnvVaultRaftNonVoter = "VAULT_RAFT_RETRY_JOIN_AS_NON_VOTER"
|
||||
raftNonVoterConfigKey = "retry_join_as_non_voter"
|
||||
|
||||
// EnvVaultRaftMaxBatchEntries is used to override the default maxBatchEntries
|
||||
// limit.
|
||||
EnvVaultRaftMaxBatchEntries = "VAULT_RAFT_MAX_BATCH_ENTRIES"
|
||||
|
||||
// EnvVaultRaftMaxBatchSizeBytes is used to override the default maxBatchSize
|
||||
// limit.
|
||||
EnvVaultRaftMaxBatchSizeBytes = "VAULT_RAFT_MAX_BATCH_SIZE_BYTES"
|
||||
|
||||
// defaultMaxBatchEntries is the default maxBatchEntries limit. This was
|
||||
// derived from performance testing. It is effectively high enough never to be
|
||||
// a real limit for realistic Vault operation sizes and the size limit
|
||||
// provides the actual limit since that amount of data stored is more relevant
|
||||
// that the specific number of operations.
|
||||
defaultMaxBatchEntries = 4096
|
||||
|
||||
// defaultMaxBatchSize is the default maxBatchSize limit. This was derived
|
||||
// from performance testing.
|
||||
defaultMaxBatchSize = 128 * 1024
|
||||
)
|
||||
|
||||
var getMmapFlags = func(string) int { return 0 }
|
||||
|
||||
// Verify RaftBackend satisfies the correct interfaces
|
||||
var (
|
||||
_ physical.Backend = (*RaftBackend)(nil)
|
||||
_ physical.Transactional = (*RaftBackend)(nil)
|
||||
_ physical.HABackend = (*RaftBackend)(nil)
|
||||
_ physical.Lock = (*RaftLock)(nil)
|
||||
_ physical.Backend = (*RaftBackend)(nil)
|
||||
_ physical.Transactional = (*RaftBackend)(nil)
|
||||
_ physical.TransactionalLimits = (*RaftBackend)(nil)
|
||||
_ physical.HABackend = (*RaftBackend)(nil)
|
||||
_ physical.Lock = (*RaftLock)(nil)
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -141,6 +161,17 @@ type RaftBackend struct {
|
||||
// performance.
|
||||
maxEntrySize uint64
|
||||
|
||||
// maxBatchEntries is the number of operation entries in each batch. It is set
|
||||
// by default to a value we've tested to work well but may be overridden by
|
||||
// Environment variable VAULT_RAFT_MAX_BATCH_ENTRIES.
|
||||
maxBatchEntries int
|
||||
|
||||
// maxBatchSize is the maximum combined key and value size of operation
|
||||
// entries in each batch. It is set by default to a value we've tested to work
|
||||
// well but may be overridden by Environment variable
|
||||
// VAULT_RAFT_MAX_BATCH_SIZE_BYTES.
|
||||
maxBatchSize int
|
||||
|
||||
// autopilot is the instance of raft-autopilot library implementation of the
|
||||
// autopilot features. This will be instantiated in both leader and followers.
|
||||
// However, only active node will have a "running" autopilot.
|
||||
@@ -339,6 +370,30 @@ func (c *ClusterAddrBridge) ServerAddr(id raft.ServerID) (raft.ServerAddress, er
|
||||
return "", fmt.Errorf("could not find cluster addr for id=%s", id)
|
||||
}
|
||||
|
||||
func batchLimitsFromEnv(logger log.Logger) (int, int) {
|
||||
maxBatchEntries := defaultMaxBatchEntries
|
||||
if envVal := os.Getenv(EnvVaultRaftMaxBatchEntries); envVal != "" {
|
||||
if i, err := strconv.Atoi(envVal); err == nil && i > 0 {
|
||||
maxBatchEntries = i
|
||||
} else {
|
||||
logger.Warn("failed to parse VAULT_RAFT_MAX_BATCH_ENTRIES as an integer > 0. Using default value.",
|
||||
"env_val", envVal, "default_used", maxBatchEntries)
|
||||
}
|
||||
}
|
||||
|
||||
maxBatchSize := defaultMaxBatchSize
|
||||
if envVal := os.Getenv(EnvVaultRaftMaxBatchSizeBytes); envVal != "" {
|
||||
if i, err := strconv.Atoi(envVal); err == nil && i > 0 {
|
||||
maxBatchSize = i
|
||||
} else {
|
||||
logger.Warn("failed to parse VAULT_RAFT_MAX_BATCH_SIZE_BYTES as an integer > 0. Using default value.",
|
||||
"env_val", envVal, "default_used", maxBatchSize)
|
||||
}
|
||||
}
|
||||
|
||||
return maxBatchEntries, maxBatchSize
|
||||
}
|
||||
|
||||
// NewRaftBackend constructs a RaftBackend using the given directory
|
||||
func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
|
||||
path := os.Getenv(EnvVaultRaftPath)
|
||||
@@ -531,6 +586,8 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend
|
||||
return nil, fmt.Errorf("setting %s to true is only valid if at least one retry_join stanza is specified", raftNonVoterConfigKey)
|
||||
}
|
||||
|
||||
maxBatchEntries, maxBatchSize := batchLimitsFromEnv(logger)
|
||||
|
||||
return &RaftBackend{
|
||||
logger: logger,
|
||||
fsm: fsm,
|
||||
@@ -543,6 +600,8 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend
|
||||
localID: localID,
|
||||
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
|
||||
maxEntrySize: maxEntrySize,
|
||||
maxBatchEntries: maxBatchEntries,
|
||||
maxBatchSize: maxBatchSize,
|
||||
followerHeartbeatTicker: time.NewTicker(time.Second),
|
||||
autopilotReconcileInterval: reconcileInterval,
|
||||
autopilotUpdateInterval: updateInterval,
|
||||
@@ -1674,6 +1733,10 @@ func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *RaftBackend) TransactionLimits() (int, int) {
|
||||
return b.maxBatchEntries, b.maxBatchSize
|
||||
}
|
||||
|
||||
// applyLog will take a given log command and apply it to the raft log. applyLog
|
||||
// doesn't return until the log has been applied to a quorum of servers and is
|
||||
// persisted to the local FSM. Caller should hold the backend's read lock.
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func connectPeers(nodes ...*RaftBackend) {
|
||||
@@ -634,6 +635,88 @@ func TestRaft_TransactionalBackend_ThreeNode(t *testing.T) {
|
||||
compareFSMs(t, raft1.fsm, raft3.fsm)
|
||||
}
|
||||
|
||||
// TestRaft_TransactionalLimitsEnvOverride ensures the ENV var overrides for
|
||||
// transaction size limits are plumbed through as expected.
|
||||
func TestRaft_TransactionalLimitsEnvOverride(t *testing.T) {
|
||||
tc := []struct {
|
||||
name string
|
||||
envEntries string
|
||||
envSize string
|
||||
wantEntries int
|
||||
wantSize int
|
||||
wantLog string
|
||||
}{
|
||||
{
|
||||
name: "defaults",
|
||||
wantEntries: defaultMaxBatchEntries,
|
||||
wantSize: defaultMaxBatchSize,
|
||||
},
|
||||
{
|
||||
name: "valid env",
|
||||
envEntries: "123",
|
||||
envSize: "456",
|
||||
wantEntries: 123,
|
||||
wantSize: 456,
|
||||
},
|
||||
{
|
||||
name: "invalid entries",
|
||||
envEntries: "not-a-number",
|
||||
envSize: "100",
|
||||
wantEntries: defaultMaxBatchEntries,
|
||||
wantSize: 100,
|
||||
wantLog: "failed to parse VAULT_RAFT_MAX_BATCH_ENTRIES",
|
||||
},
|
||||
{
|
||||
name: "invalid entries",
|
||||
envEntries: "100",
|
||||
envSize: "asdasdsasd",
|
||||
wantEntries: 100,
|
||||
wantSize: defaultMaxBatchSize,
|
||||
wantLog: "failed to parse VAULT_RAFT_MAX_BATCH_SIZE_BYTES",
|
||||
},
|
||||
{
|
||||
name: "zero entries",
|
||||
envEntries: "0",
|
||||
envSize: "100",
|
||||
wantEntries: defaultMaxBatchEntries,
|
||||
wantSize: 100,
|
||||
wantLog: "failed to parse VAULT_RAFT_MAX_BATCH_ENTRIES as an integer > 0",
|
||||
},
|
||||
{
|
||||
name: "zero size",
|
||||
envEntries: "100",
|
||||
envSize: "0",
|
||||
wantEntries: 100,
|
||||
wantSize: defaultMaxBatchSize,
|
||||
wantLog: "failed to parse VAULT_RAFT_MAX_BATCH_SIZE_BYTES as an integer > 0",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tc {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Set the env vars within this test
|
||||
if tt.envEntries != "" {
|
||||
t.Setenv(EnvVaultRaftMaxBatchEntries, tt.envEntries)
|
||||
}
|
||||
if tt.envSize != "" {
|
||||
t.Setenv(EnvVaultRaftMaxBatchSizeBytes, tt.envSize)
|
||||
}
|
||||
|
||||
var logBuf bytes.Buffer
|
||||
raft1, dir := GetRaftWithLogOutput(t, false, true, &logBuf)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
e, s := raft1.TransactionLimits()
|
||||
|
||||
require.Equal(t, tt.wantEntries, e)
|
||||
require.Equal(t, tt.wantSize, s)
|
||||
if tt.wantLog != "" {
|
||||
require.Contains(t, logBuf.String(), tt.wantLog)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRaft_Backend_Performance(t *testing.T) {
|
||||
b, dir := GetRaft(t, true, false)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
@@ -6,6 +6,7 @@ package raft
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
@@ -20,18 +21,29 @@ func GetRaft(t testing.TB, bootstrap bool, noStoreState bool) (*RaftBackend, str
|
||||
}
|
||||
t.Logf("raft dir: %s", raftDir)
|
||||
|
||||
return getRaftWithDir(t, bootstrap, noStoreState, raftDir)
|
||||
return getRaftWithDirAndLogOutput(t, bootstrap, noStoreState, raftDir, nil)
|
||||
}
|
||||
|
||||
func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir string) (*RaftBackend, string) {
|
||||
func GetRaftWithLogOutput(t testing.TB, bootstrap bool, noStoreState bool, logOutput io.Writer) (*RaftBackend, string) {
|
||||
raftDir, err := ioutil.TempDir("", "vault-raft-")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Logf("raft dir: %s", raftDir)
|
||||
|
||||
return getRaftWithDirAndLogOutput(t, bootstrap, noStoreState, raftDir, logOutput)
|
||||
}
|
||||
|
||||
func getRaftWithDirAndLogOutput(t testing.TB, bootstrap bool, noStoreState bool, raftDir string, logOutput io.Writer) (*RaftBackend, string) {
|
||||
id, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Name: fmt.Sprintf("raft-%s", id),
|
||||
Level: hclog.Trace,
|
||||
Name: fmt.Sprintf("raft-%s", id),
|
||||
Level: hclog.Trace,
|
||||
Output: logOutput,
|
||||
})
|
||||
logger.Info("raft dir", "dir", raftDir)
|
||||
|
||||
|
||||
@@ -86,6 +86,7 @@ var (
|
||||
_ ToggleablePurgemonster = (*TransactionalCache)(nil)
|
||||
_ Backend = (*Cache)(nil)
|
||||
_ Transactional = (*TransactionalCache)(nil)
|
||||
_ TransactionalLimits = (*TransactionalCache)(nil)
|
||||
)
|
||||
|
||||
// NewCache returns a physical cache of the given size.
|
||||
@@ -271,3 +272,14 @@ func (c *TransactionalCache) Transaction(ctx context.Context, txns []*TxnEntry)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TransactionLimits implements physical.TransactionalLimits
|
||||
func (c *TransactionalCache) TransactionLimits() (int, int) {
|
||||
if tl, ok := c.Transactional.(TransactionalLimits); ok {
|
||||
return tl.TransactionLimits()
|
||||
}
|
||||
// We don't have any specific limits of our own so return zeros to signal that
|
||||
// the caller should use whatever reasonable defaults it would if it used a
|
||||
// non-TransactionalLimits backend.
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
54
sdk/physical/cache_test.go
Normal file
54
sdk/physical/cache_test.go
Normal file
@@ -0,0 +1,54 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package physical
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTransactionalCache_TransactionLimits(t *testing.T) {
|
||||
tc := []struct {
|
||||
name string
|
||||
be Backend
|
||||
wantEntries int
|
||||
wantSize int
|
||||
}{
|
||||
{
|
||||
name: "non-transactionlimits backend",
|
||||
be: &TestTransactionalNonLimitBackend{},
|
||||
|
||||
// Should return zeros to let the implementor choose defaults.
|
||||
wantEntries: 0,
|
||||
wantSize: 0,
|
||||
},
|
||||
{
|
||||
name: "transactionlimits backend",
|
||||
be: &TestTransactionalLimitBackend{
|
||||
MaxEntries: 123,
|
||||
MaxSize: 345,
|
||||
},
|
||||
|
||||
// Should return underlying limits
|
||||
wantEntries: 123,
|
||||
wantSize: 345,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tc {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
logger := hclog.NewNullLogger()
|
||||
|
||||
be := NewTransactionalCache(tt.be, 1024, logger, nil)
|
||||
|
||||
// Call the TransactionLimits method
|
||||
maxEntries, maxBytes := be.TransactionLimits()
|
||||
|
||||
require.Equal(t, tt.wantEntries, maxEntries)
|
||||
require.Equal(t, tt.wantSize, maxBytes)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -98,6 +98,17 @@ func (e *TransactionalStorageEncoding) Transaction(ctx context.Context, txns []*
|
||||
return e.Transactional.Transaction(ctx, txns)
|
||||
}
|
||||
|
||||
// TransactionLimits implements physical.TransactionalLimits
|
||||
func (e *TransactionalStorageEncoding) TransactionLimits() (int, int) {
|
||||
if tl, ok := e.Transactional.(TransactionalLimits); ok {
|
||||
return tl.TransactionLimits()
|
||||
}
|
||||
// We don't have any specific limits of our own so return zeros to signal that
|
||||
// the caller should use whatever reasonable defaults it would if it used a
|
||||
// non-TransactionalLimits backend.
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
func (e *StorageEncoding) Purge(ctx context.Context) {
|
||||
if purgeable, ok := e.Backend.(ToggleablePurgemonster); ok {
|
||||
purgeable.Purge(ctx)
|
||||
|
||||
51
sdk/physical/encoding_test.go
Normal file
51
sdk/physical/encoding_test.go
Normal file
@@ -0,0 +1,51 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package physical
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTransactionalStorageEncoding_TransactionLimits(t *testing.T) {
|
||||
tc := []struct {
|
||||
name string
|
||||
be Backend
|
||||
wantEntries int
|
||||
wantSize int
|
||||
}{
|
||||
{
|
||||
name: "non-transactionlimits backend",
|
||||
be: &TestTransactionalNonLimitBackend{},
|
||||
|
||||
// Should return zeros to let the implementor choose defaults.
|
||||
wantEntries: 0,
|
||||
wantSize: 0,
|
||||
},
|
||||
{
|
||||
name: "transactionlimits backend",
|
||||
be: &TestTransactionalLimitBackend{
|
||||
MaxEntries: 123,
|
||||
MaxSize: 345,
|
||||
},
|
||||
|
||||
// Should return underlying limits
|
||||
wantEntries: 123,
|
||||
wantSize: 345,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tc {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
be := NewStorageEncoding(tt.be).(TransactionalLimits)
|
||||
|
||||
// Call the TransactionLimits method
|
||||
maxEntries, maxBytes := be.TransactionLimits()
|
||||
|
||||
require.Equal(t, tt.wantEntries, maxEntries)
|
||||
require.Equal(t, tt.wantSize, maxBytes)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -111,3 +111,14 @@ func (e *TransactionalErrorInjector) Transaction(ctx context.Context, txns []*Tx
|
||||
}
|
||||
return e.Transactional.Transaction(ctx, txns)
|
||||
}
|
||||
|
||||
// TransactionLimits implements physical.TransactionalLimits
|
||||
func (e *TransactionalErrorInjector) TransactionLimits() (int, int) {
|
||||
if tl, ok := e.Transactional.(TransactionalLimits); ok {
|
||||
return tl.TransactionLimits()
|
||||
}
|
||||
// We don't have any specific limits of our own so return zeros to signal that
|
||||
// the caller should use whatever reasonable defaults it would if it used a
|
||||
// non-TransactionalLimits backend.
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
53
sdk/physical/error_test.go
Normal file
53
sdk/physical/error_test.go
Normal file
@@ -0,0 +1,53 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package physical
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTransactionalErrorInjector_TransactionLimits(t *testing.T) {
|
||||
tc := []struct {
|
||||
name string
|
||||
be Backend
|
||||
wantEntries int
|
||||
wantSize int
|
||||
}{
|
||||
{
|
||||
name: "non-transactionlimits backend",
|
||||
be: &TestTransactionalNonLimitBackend{},
|
||||
|
||||
// Should return zeros to let the implementor choose defaults.
|
||||
wantEntries: 0,
|
||||
wantSize: 0,
|
||||
},
|
||||
{
|
||||
name: "transactionlimits backend",
|
||||
be: &TestTransactionalLimitBackend{
|
||||
MaxEntries: 123,
|
||||
MaxSize: 345,
|
||||
},
|
||||
|
||||
// Should return underlying limits
|
||||
wantEntries: 123,
|
||||
wantSize: 345,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tc {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
logger := hclog.NewNullLogger()
|
||||
|
||||
injector := NewTransactionalErrorInjector(tt.be, 0, logger)
|
||||
|
||||
maxEntries, maxBytes := injector.TransactionLimits()
|
||||
|
||||
require.Equal(t, tt.wantEntries, maxEntries)
|
||||
require.Equal(t, tt.wantSize, maxBytes)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -16,16 +16,18 @@ import (
|
||||
"github.com/armon/go-radix"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
uberAtomic "go.uber.org/atomic"
|
||||
)
|
||||
|
||||
// Verify interfaces are satisfied
|
||||
var (
|
||||
_ physical.Backend = (*InmemBackend)(nil)
|
||||
_ physical.HABackend = (*InmemHABackend)(nil)
|
||||
_ physical.HABackend = (*TransactionalInmemHABackend)(nil)
|
||||
_ physical.Lock = (*InmemLock)(nil)
|
||||
_ physical.Transactional = (*TransactionalInmemBackend)(nil)
|
||||
_ physical.Transactional = (*TransactionalInmemHABackend)(nil)
|
||||
_ physical.Backend = (*InmemBackend)(nil)
|
||||
_ physical.HABackend = (*InmemHABackend)(nil)
|
||||
_ physical.HABackend = (*TransactionalInmemHABackend)(nil)
|
||||
_ physical.Lock = (*InmemLock)(nil)
|
||||
_ physical.Transactional = (*TransactionalInmemBackend)(nil)
|
||||
_ physical.Transactional = (*TransactionalInmemHABackend)(nil)
|
||||
_ physical.TransactionalLimits = (*TransactionalInmemBackend)(nil)
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -55,6 +57,16 @@ type InmemBackend struct {
|
||||
|
||||
type TransactionalInmemBackend struct {
|
||||
InmemBackend
|
||||
|
||||
// Using Uber atomic because our SemGrep rules don't like the old pointer
|
||||
// trick we used above any more even though it's fine. The newer sync/atomic
|
||||
// types are almost the same, but lack was to initialize them cleanly in New*
|
||||
// functions so sticking with what SemGrep likes for now.
|
||||
maxBatchEntries *uberAtomic.Int32
|
||||
maxBatchSize *uberAtomic.Int32
|
||||
|
||||
largestBatchLen *uberAtomic.Uint64
|
||||
largestBatchSize *uberAtomic.Uint64
|
||||
}
|
||||
|
||||
// NewInmem constructs a new in-memory backend
|
||||
@@ -109,6 +121,11 @@ func NewTransactionalInmem(conf map[string]string, logger log.Logger) (physical.
|
||||
logOps: os.Getenv("VAULT_INMEM_LOG_ALL_OPS") != "",
|
||||
maxValueSize: maxValueSize,
|
||||
},
|
||||
|
||||
maxBatchEntries: uberAtomic.NewInt32(64),
|
||||
maxBatchSize: uberAtomic.NewInt32(128 * 1024),
|
||||
largestBatchLen: uberAtomic.NewUint64(0),
|
||||
largestBatchSize: uberAtomic.NewUint64(0),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -303,11 +320,39 @@ func (t *TransactionalInmemBackend) Transaction(ctx context.Context, txns []*phy
|
||||
defer t.Unlock()
|
||||
|
||||
failGetInTxn := atomic.LoadUint32(t.failGetInTxn)
|
||||
size := uint64(0)
|
||||
for _, t := range txns {
|
||||
// We use 2x key length to match the logic in WALBackend.persistWALs
|
||||
// presumably this is attempting to account for some amount of encoding
|
||||
// overhead.
|
||||
size += uint64(2*len(t.Entry.Key) + len(t.Entry.Value))
|
||||
if t.Operation == physical.GetOperation && failGetInTxn != 0 {
|
||||
return GetInTxnDisabledError
|
||||
}
|
||||
}
|
||||
|
||||
if size > t.largestBatchSize.Load() {
|
||||
t.largestBatchSize.Store(size)
|
||||
}
|
||||
if len(txns) > int(t.largestBatchLen.Load()) {
|
||||
t.largestBatchLen.Store(uint64(len(txns)))
|
||||
}
|
||||
|
||||
return physical.GenericTransactionHandler(ctx, t, txns)
|
||||
}
|
||||
|
||||
func (t *TransactionalInmemBackend) SetMaxBatchEntries(entries int) {
|
||||
t.maxBatchEntries.Store(int32(entries))
|
||||
}
|
||||
|
||||
func (t *TransactionalInmemBackend) SetMaxBatchSize(entries int) {
|
||||
t.maxBatchSize.Store(int32(entries))
|
||||
}
|
||||
|
||||
func (t *TransactionalInmemBackend) TransactionLimits() (int, int) {
|
||||
return int(t.maxBatchEntries.Load()), int(t.maxBatchSize.Load())
|
||||
}
|
||||
|
||||
func (t *TransactionalInmemBackend) BatchStats() (maxEntries uint64, maxSize uint64) {
|
||||
return t.largestBatchLen.Load(), t.largestBatchSize.Load()
|
||||
}
|
||||
|
||||
@@ -117,3 +117,14 @@ func (l *TransactionalLatencyInjector) Transaction(ctx context.Context, txns []*
|
||||
l.addLatency()
|
||||
return l.Transactional.Transaction(ctx, txns)
|
||||
}
|
||||
|
||||
// TransactionLimits implements physical.TransactionalLimits
|
||||
func (l *TransactionalLatencyInjector) TransactionLimits() (int, int) {
|
||||
if tl, ok := l.Transactional.(TransactionalLimits); ok {
|
||||
return tl.TransactionLimits()
|
||||
}
|
||||
// We don't have any specific limits of our own so return zeros to signal that
|
||||
// the caller should use whatever reasonable defaults it would if it used a
|
||||
// non-TransactionalLimits backend.
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
53
sdk/physical/latency_test.go
Normal file
53
sdk/physical/latency_test.go
Normal file
@@ -0,0 +1,53 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package physical
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTransactionalLatencyInjector_TransactionLimits(t *testing.T) {
|
||||
tc := []struct {
|
||||
name string
|
||||
be Backend
|
||||
wantEntries int
|
||||
wantSize int
|
||||
}{
|
||||
{
|
||||
name: "non-transactionlimits backend",
|
||||
be: &TestTransactionalNonLimitBackend{},
|
||||
|
||||
// Should return zeros to let the implementor choose defaults.
|
||||
wantEntries: 0,
|
||||
wantSize: 0,
|
||||
},
|
||||
{
|
||||
name: "transactionlimits backend",
|
||||
be: &TestTransactionalLimitBackend{
|
||||
MaxEntries: 123,
|
||||
MaxSize: 345,
|
||||
},
|
||||
|
||||
// Should return underlying limits
|
||||
wantEntries: 123,
|
||||
wantSize: 345,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tc {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
logger := hclog.NewNullLogger()
|
||||
|
||||
injector := NewTransactionalLatencyInjector(tt.be, 0, 0, logger)
|
||||
|
||||
maxEntries, maxBytes := injector.TransactionLimits()
|
||||
|
||||
require.Equal(t, tt.wantEntries, maxEntries)
|
||||
require.Equal(t, tt.wantSize, maxBytes)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -518,3 +518,43 @@ func SetupTestingTransactions(t testing.TB, b Backend) []*TxnEntry {
|
||||
|
||||
return txns
|
||||
}
|
||||
|
||||
// Several tests across packages have to test logic with a few variations of
|
||||
// transactional backends. Make some suitable for testing limits support that
|
||||
// can be re-used.
|
||||
|
||||
type TestTransactionalNonLimitBackend struct{}
|
||||
|
||||
var _ Transactional = (*TestTransactionalNonLimitBackend)(nil)
|
||||
|
||||
func (b *TestTransactionalNonLimitBackend) Put(ctx context.Context, entry *Entry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *TestTransactionalNonLimitBackend) Get(ctx context.Context, key string) (*Entry, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (b *TestTransactionalNonLimitBackend) Delete(ctx context.Context, key string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *TestTransactionalNonLimitBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (b *TestTransactionalNonLimitBackend) Transaction(ctx context.Context, txns []*TxnEntry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type TestTransactionalLimitBackend struct {
|
||||
TestTransactionalNonLimitBackend
|
||||
|
||||
MaxEntries, MaxSize int
|
||||
}
|
||||
|
||||
var _ TransactionalLimits = (*TestTransactionalLimitBackend)(nil)
|
||||
|
||||
func (b *TestTransactionalLimitBackend) TransactionLimits() (int, int) {
|
||||
return b.MaxEntries, b.MaxSize
|
||||
}
|
||||
|
||||
@@ -34,6 +34,35 @@ type TransactionalBackend interface {
|
||||
Transactional
|
||||
}
|
||||
|
||||
// TransactionalLimits SHOULD be implemented by all TransactionalBackend
|
||||
// implementations. It is separate for backwards compatibility reasons since
|
||||
// this in a public SDK module. If a TransactionalBackend does not implement
|
||||
// this, the historic default limits of 63 entries and 128kb (based on Consul's
|
||||
// limits) are used by replication internals when encoding batches of
|
||||
// transactions.
|
||||
type TransactionalLimits interface {
|
||||
TransactionalBackend
|
||||
|
||||
// TransactionLimits must return the limits of how large each transaction may
|
||||
// be. The limits returned indicate how many individual operation entries are
|
||||
// supported in total and an overall size limit on the contents of each
|
||||
// transaction if applicable. Vault will deduct any meta-operations it needs
|
||||
// to add from the maxEntries given. maxSize will be compared against the sum
|
||||
// of the key and value sizes for all operations in a transaction. The backend
|
||||
// should provide a reasonable margin of safety for any overhead it may have
|
||||
// while encoding, for example Consul's encoded transaction in JSON must fit
|
||||
// in the configured max transaction size so it must leave adequate room for
|
||||
// JSON encoding overhead on top of the raw key and value sizes.
|
||||
//
|
||||
// If zero is returned for either value, the replication internals will use
|
||||
// historic reasonable defaults. This allows middleware implementations such
|
||||
// as cache layers to either pass through to the underlying backend if it
|
||||
// implements this interface, or to return zeros to indicate that the
|
||||
// implementer should apply whatever defaults it would use if the middleware
|
||||
// were not present.
|
||||
TransactionLimits() (maxEntries int, maxSize int)
|
||||
}
|
||||
|
||||
type PseudoTransactional interface {
|
||||
// An internal function should do no locking or permit pool acquisition.
|
||||
// Depending on the backend and if it natively supports transactions, these
|
||||
|
||||
Reference in New Issue
Block a user