Port over some SP v2 bits (#6516)

* Port over some SP v2 bits

Specifically:

* Add too-large handling to Physical (Consul only for now)
* Contextify some identity funcs
* Update SP protos

* Add size limiting to inmem storage
This commit is contained in:
Jeff Mitchell
2019-05-01 13:47:41 -04:00
committed by GitHub
parent a84a16da5c
commit f7bb5a2e56
21 changed files with 337 additions and 133 deletions

View File

@@ -142,7 +142,7 @@ func (s *StoragePacker) BucketKey(itemID string) string {
// DeleteItem removes the storage entry which the given key refers to from its // DeleteItem removes the storage entry which the given key refers to from its
// corresponding bucket. // corresponding bucket.
func (s *StoragePacker) DeleteItem(itemID string) error { func (s *StoragePacker) DeleteItem(_ context.Context, itemID string) error {
if itemID == "" { if itemID == "" {
return fmt.Errorf("empty item ID") return fmt.Errorf("empty item ID")
@@ -269,7 +269,7 @@ func (s *StoragePacker) GetItem(itemID string) (*Item, error) {
} }
// PutItem stores a storage entry in its corresponding bucket // PutItem stores a storage entry in its corresponding bucket
func (s *StoragePacker) PutItem(item *Item) error { func (s *StoragePacker) PutItem(_ context.Context, item *Item) error {
if item == nil { if item == nil {
return fmt.Errorf("nil item") return fmt.Errorf("nil item")
} }

View File

@@ -1,6 +1,7 @@
package storagepacker package storagepacker
import ( import (
"context"
"testing" "testing"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@@ -17,6 +18,8 @@ func BenchmarkStoragePacker(b *testing.B) {
b.Fatal(err) b.Fatal(err)
} }
ctx := context.Background()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
itemID, err := uuid.GenerateUUID() itemID, err := uuid.GenerateUUID()
if err != nil { if err != nil {
@@ -27,7 +30,7 @@ func BenchmarkStoragePacker(b *testing.B) {
ID: itemID, ID: itemID,
} }
err = storagePacker.PutItem(item) err = storagePacker.PutItem(ctx, item)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
@@ -45,7 +48,7 @@ func BenchmarkStoragePacker(b *testing.B) {
b.Fatalf("bad: item ID; expected: %q\n actual: %q", item.ID, fetchedItem.ID) b.Fatalf("bad: item ID; expected: %q\n actual: %q", item.ID, fetchedItem.ID)
} }
err = storagePacker.DeleteItem(item.ID) err = storagePacker.DeleteItem(ctx, item.ID)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
@@ -66,12 +69,14 @@ func TestStoragePacker(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
ctx := context.Background()
// Persist a storage entry // Persist a storage entry
item1 := &Item{ item1 := &Item{
ID: "item1", ID: "item1",
} }
err = storagePacker.PutItem(item1) err = storagePacker.PutItem(ctx, item1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -90,7 +95,7 @@ func TestStoragePacker(t *testing.T) {
} }
// Delete item1 // Delete item1
err = storagePacker.DeleteItem(item1.ID) err = storagePacker.DeleteItem(ctx, item1.ID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -112,6 +117,8 @@ func TestStoragePacker_SerializeDeserializeComplexItem(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
ctx := context.Background()
timeNow := ptypes.TimestampNow() timeNow := ptypes.TimestampNow()
alias1 := &identity.Alias{ alias1 := &identity.Alias{
@@ -147,7 +154,7 @@ func TestStoragePacker_SerializeDeserializeComplexItem(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = storagePacker.PutItem(&Item{ err = storagePacker.PutItem(ctx, &Item{
ID: entity.ID, ID: entity.ID,
Message: marshaledEntity, Message: marshaledEntity,
}) })

View File

@@ -21,8 +21,15 @@ var _ = math.Inf
// proto package needs to be updated. // proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// Item represents an entry that gets inserted into the storage packer
type Item struct { type Item struct {
ID string `sentinel:"" protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // ID must be provided by the caller; the same value, if used with GetItem,
// can be used to fetch the item. However, when iterating through a bucket,
// this ID will be an internal ID. In other words, outside of the use-case
// described above, the caller *must not* rely on this value to be
// consistent with what they passed in.
ID string `sentinel:"" protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// message is the contents of the item
Message *any.Any `sentinel:"" protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` Message *any.Any `sentinel:"" protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
@@ -68,12 +75,22 @@ func (m *Item) GetMessage() *any.Any {
return nil return nil
} }
// Bucket is a construct to hold multiple items within itself. This
// abstraction contains multiple buckets of the same kind within itself and
// shares amont them the items that get inserted. When the bucket as a whole
// gets too big to hold more items, the contained buckets gets pushed out only
// to become independent buckets. Hence, this can grow infinitely in terms of
// storage space for items that get inserted.
type Bucket struct { type Bucket struct {
Key string `sentinel:"" protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` // Key is the storage path where the bucket gets stored
Items []*Item `sentinel:"" protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"` Key string `sentinel:"" protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` // Items holds the items contained within this bucket. Used by v1.
XXX_unrecognized []byte `json:"-"` Items []*Item `sentinel:"" protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"`
XXX_sizecache int32 `json:"-"` // ItemMap stores a mapping of item ID to message. Used by v2.
ItemMap map[string]*any.Any `sentinel:"" protobuf:"bytes,3,rep,name=item_map,json=itemMap,proto3" json:"item_map,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
} }
func (m *Bucket) Reset() { *m = Bucket{} } func (m *Bucket) Reset() { *m = Bucket{} }
@@ -115,27 +132,39 @@ func (m *Bucket) GetItems() []*Item {
return nil return nil
} }
func (m *Bucket) GetItemMap() map[string]*any.Any {
if m != nil {
return m.ItemMap
}
return nil
}
func init() { func init() {
proto.RegisterType((*Item)(nil), "storagepacker.Item") proto.RegisterType((*Item)(nil), "storagepacker.Item")
proto.RegisterType((*Bucket)(nil), "storagepacker.Bucket") proto.RegisterType((*Bucket)(nil), "storagepacker.Bucket")
proto.RegisterMapType((map[string]*any.Any)(nil), "storagepacker.Bucket.ItemMapEntry")
} }
func init() { proto.RegisterFile("helper/storagepacker/types.proto", fileDescriptor_c0e98c66c4f51b7f) } func init() { proto.RegisterFile("helper/storagepacker/types.proto", fileDescriptor_c0e98c66c4f51b7f) }
var fileDescriptor_c0e98c66c4f51b7f = []byte{ var fileDescriptor_c0e98c66c4f51b7f = []byte{
// 219 bytes of a gzipped FileDescriptorProto // 276 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x8f, 0x41, 0x4b, 0xc3, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x91, 0xcf, 0x4b, 0xc3, 0x30,
0x10, 0x85, 0x49, 0xaa, 0x15, 0xb7, 0x28, 0xb2, 0x7a, 0x88, 0x9e, 0x42, 0x4f, 0xf1, 0x32, 0x83, 0x14, 0xc7, 0x69, 0xeb, 0x36, 0x7d, 0x53, 0x91, 0xe8, 0xa1, 0xee, 0x54, 0x7a, 0xaa, 0x1e, 0x12,
0xf5, 0x17, 0x58, 0x50, 0xf0, 0x9a, 0xa3, 0xb7, 0x4d, 0x3a, 0x6e, 0x96, 0x64, 0xbb, 0xcb, 0xee, 0x9c, 0x17, 0x11, 0x3c, 0x38, 0x50, 0xf0, 0x20, 0x48, 0x8f, 0x5e, 0x24, 0xed, 0x9e, 0x6d, 0xe8,
0xac, 0xb0, 0xff, 0x5e, 0xda, 0xd8, 0x43, 0xc1, 0xdb, 0xc0, 0xfb, 0xf8, 0xe6, 0x3d, 0x51, 0x0f, 0x8f, 0x84, 0x24, 0x1d, 0xf4, 0x1f, 0xf5, 0xef, 0x91, 0x36, 0x0e, 0x9c, 0x0c, 0x6f, 0x2f, 0x7c,
0x34, 0x79, 0x0a, 0x18, 0xd9, 0x05, 0xa5, 0xc9, 0xab, 0x7e, 0xa4, 0x80, 0x9c, 0x3d, 0x45, 0xf0, 0x3f, 0xf9, 0xe4, 0x1b, 0x1e, 0x44, 0x25, 0xd6, 0x0a, 0x35, 0x33, 0x56, 0x6a, 0x5e, 0xa0, 0xe2,
0xc1, 0xb1, 0x93, 0x37, 0x67, 0xd1, 0xd3, 0xa3, 0x76, 0x4e, 0x4f, 0x84, 0xc7, 0xb0, 0x4b, 0xdf, 0x79, 0x85, 0x9a, 0xd9, 0x5e, 0xa1, 0xa1, 0x4a, 0x4b, 0x2b, 0xc9, 0xc9, 0x4e, 0xb4, 0xb8, 0x2c,
0xa8, 0xf6, 0x79, 0x26, 0xd7, 0x1f, 0xe2, 0xe2, 0x93, 0xc9, 0xca, 0x5b, 0x51, 0x9a, 0x5d, 0x55, 0xa4, 0x2c, 0x6a, 0x64, 0x63, 0x98, 0x75, 0x9f, 0x8c, 0xb7, 0xbd, 0x23, 0xe3, 0x67, 0x38, 0x78,
0xd4, 0x45, 0x73, 0xdd, 0x96, 0x66, 0x27, 0x41, 0x5c, 0x59, 0x8a, 0x51, 0x69, 0xaa, 0xca, 0xba, 0xb1, 0xd8, 0x90, 0x53, 0xf0, 0xc5, 0x3a, 0xf4, 0x22, 0x2f, 0x39, 0x4a, 0x7d, 0xb1, 0x26, 0x14,
0x68, 0x56, 0x9b, 0x07, 0x98, 0x25, 0x70, 0x92, 0xc0, 0xdb, 0x3e, 0xb7, 0x27, 0x68, 0xfd, 0x2e, 0x66, 0x0d, 0x1a, 0xc3, 0x0b, 0x0c, 0xfd, 0xc8, 0x4b, 0xe6, 0xcb, 0x0b, 0xea, 0x24, 0x74, 0x2b,
0x96, 0xdb, 0xd4, 0x8f, 0xc4, 0xf2, 0x4e, 0x2c, 0x46, 0xca, 0x7f, 0xaa, 0xc3, 0x29, 0x9f, 0xc5, 0xa1, 0x8f, 0x6d, 0x9f, 0x6e, 0xa1, 0xf8, 0xcb, 0x83, 0xe9, 0xaa, 0xcb, 0x2b, 0xb4, 0xe4, 0x0c,
0xa5, 0x61, 0xb2, 0xb1, 0x2a, 0xeb, 0x45, 0xb3, 0xda, 0xdc, 0xc3, 0x59, 0x3b, 0x38, 0xfc, 0x6f, 0x82, 0x0a, 0xfb, 0x1f, 0xd7, 0x30, 0x92, 0x2b, 0x98, 0x08, 0x8b, 0x8d, 0x09, 0xfd, 0x28, 0x48,
0x67, 0x62, 0xfb, 0xf2, 0x85, 0xda, 0xf0, 0x90, 0x3a, 0xe8, 0x9d, 0xc5, 0x41, 0xc5, 0xc1, 0xf4, 0xe6, 0xcb, 0x73, 0xba, 0x53, 0x8f, 0x0e, 0x05, 0x52, 0x47, 0x90, 0x07, 0x38, 0x1c, 0x86, 0x8f,
0x2e, 0x78, 0xfc, 0x51, 0x69, 0x62, 0xfc, 0x6f, 0x77, 0xb7, 0x3c, 0x16, 0x7a, 0xfd, 0x0d, 0x00, 0x86, 0xab, 0x30, 0x18, 0xe9, 0xf8, 0x0f, 0xed, 0x5e, 0x19, 0x2f, 0xbd, 0x72, 0xf5, 0xd4, 0x5a,
0x00, 0xff, 0xff, 0x1c, 0x8e, 0xb4, 0xa9, 0x16, 0x01, 0x00, 0x00, 0xdd, 0xa7, 0x33, 0xe1, 0x4e, 0x8b, 0x37, 0x38, 0xfe, 0x1d, 0xec, 0xe9, 0x72, 0x0d, 0x93, 0x0d,
0xaf, 0xbb, 0xff, 0xbf, 0xe5, 0x90, 0x7b, 0xff, 0xce, 0x5b, 0xdd, 0xbc, 0xb3, 0x42, 0xd8, 0xb2,
0xcb, 0x68, 0x2e, 0x1b, 0x56, 0x72, 0x53, 0x8a, 0x5c, 0x6a, 0xc5, 0x36, 0xbc, 0xab, 0x2d, 0xdb,
0xb7, 0x89, 0x6c, 0x3a, 0xba, 0x6e, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x46, 0x9d, 0x8a, 0xcb,
0xa8, 0x01, 0x00, 0x00,
} }

View File

@@ -6,12 +6,29 @@ package storagepacker;
import "google/protobuf/any.proto"; import "google/protobuf/any.proto";
// Item represents an entry that gets inserted into the storage packer
message Item { message Item {
string id = 1; // ID must be provided by the caller; the same value, if used with GetItem,
google.protobuf.Any message = 2; // can be used to fetch the item. However, when iterating through a bucket,
// this ID will be an internal ID. In other words, outside of the use-case
// described above, the caller *must not* rely on this value to be
// consistent with what they passed in.
string id = 1;
// message is the contents of the item
google.protobuf.Any message = 2;
} }
// Bucket is a construct to hold multiple items within itself. This
// abstraction contains multiple buckets of the same kind within itself and
// shares amont them the items that get inserted. When the bucket as a whole
// gets too big to hold more items, the contained buckets gets pushed out only
// to become independent buckets. Hence, this can grow infinitely in terms of
// storage space for items that get inserted.
message Bucket { message Bucket {
string key = 1; // Key is the storage path where the bucket gets stored
repeated Item items = 2; string key = 1;
// Items holds the items contained within this bucket. Used by v1.
repeated Item items = 2;
// ItemMap stores a mapping of item ID to message. Used by v2.
map <string, google.protobuf.Any> item_map = 3;
} }

View File

@@ -401,6 +401,9 @@ func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEnt
ok, resp, _, err := c.kv.Txn(ops, queryOpts) ok, resp, _, err := c.kv.Txn(ops, queryOpts)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "is too large") {
return errwrap.Wrapf(fmt.Sprintf("%s: {{err}}", physical.ErrValueTooLarge), err)
}
return err return err
} }
if ok && len(resp.Errors) == 0 { if ok && len(resp.Errors) == 0 {
@@ -431,7 +434,13 @@ func (c *ConsulBackend) Put(ctx context.Context, entry *physical.Entry) error {
writeOpts = writeOpts.WithContext(ctx) writeOpts = writeOpts.WithContext(ctx)
_, err := c.kv.Put(pair, writeOpts) _, err := c.kv.Put(pair, writeOpts)
return err if err != nil {
if strings.Contains(err.Error(), "Value exceeds") {
return errwrap.Wrapf(fmt.Sprintf("%s: {{err}}", physical.ErrValueTooLarge), err)
}
return err
}
return nil
} }
// Get is used to fetch an entry // Get is used to fetch an entry

View File

@@ -1,10 +1,12 @@
package consul package consul
import ( import (
"context"
"fmt" "fmt"
"math/rand" "math/rand"
"os" "os"
"reflect" "reflect"
"strings"
"sync" "sync"
"testing" "testing"
"time" "time"
@@ -20,8 +22,7 @@ import (
type consulConf map[string]string type consulConf map[string]string
var ( var (
addrCount int = 0 addrCount int = 0
testImagePull sync.Once
) )
func testConsulBackend(t *testing.T) *ConsulBackend { func testConsulBackend(t *testing.T) *ConsulBackend {
@@ -488,7 +489,7 @@ func TestConsulBackend(t *testing.T) {
consulToken := os.Getenv("CONSUL_HTTP_TOKEN") consulToken := os.Getenv("CONSUL_HTTP_TOKEN")
addr := os.Getenv("CONSUL_HTTP_ADDR") addr := os.Getenv("CONSUL_HTTP_ADDR")
if addr == "" { if addr == "" {
cleanup, connURL, token := consul.PrepareTestContainer(t, "1.4.0-rc1") cleanup, connURL, token := consul.PrepareTestContainer(t, "1.4.4")
defer cleanup() defer cleanup()
addr, consulToken = connURL, token addr, consulToken = connURL, token
} }
@@ -522,11 +523,82 @@ func TestConsulBackend(t *testing.T) {
physical.ExerciseBackend_ListPrefix(t, b) physical.ExerciseBackend_ListPrefix(t, b)
} }
func TestConsul_TooLarge(t *testing.T) {
consulToken := os.Getenv("CONSUL_HTTP_TOKEN")
addr := os.Getenv("CONSUL_HTTP_ADDR")
if addr == "" {
cleanup, connURL, token := consul.PrepareTestContainer(t, "1.4.4")
defer cleanup()
addr, consulToken = connURL, token
}
conf := api.DefaultConfig()
conf.Address = addr
conf.Token = consulToken
client, err := api.NewClient(conf)
if err != nil {
t.Fatalf("err: %v", err)
}
randPath := fmt.Sprintf("vault-%d/", time.Now().Unix())
defer func() {
client.KV().DeleteTree(randPath, nil)
}()
logger := logging.NewVaultLogger(log.Debug)
b, err := NewConsulBackend(map[string]string{
"address": conf.Address,
"path": randPath,
"max_parallel": "256",
"token": conf.Token,
}, logger)
if err != nil {
t.Fatalf("err: %s", err)
}
zeros := make([]byte, 600000, 600000)
n, err := rand.Read(zeros)
if n != 600000 {
t.Fatalf("expected 500k zeros, read %d", n)
}
if err != nil {
t.Fatal(err)
}
err = b.Put(context.Background(), &physical.Entry{
Key: "foo",
Value: zeros,
})
if err == nil {
t.Fatal("expected error")
}
if !strings.Contains(err.Error(), physical.ErrValueTooLarge) {
t.Fatalf("expected value too large error, got %v", err)
}
err = b.(physical.Transactional).Transaction(context.Background(), []*physical.TxnEntry{
{
Operation: physical.PutOperation,
Entry: &physical.Entry{
Key: "foo",
Value: zeros,
},
},
})
if err == nil {
t.Fatal("expected error")
}
if !strings.Contains(err.Error(), physical.ErrValueTooLarge) {
t.Fatalf("expected value too large error, got %v", err)
}
}
func TestConsulHABackend(t *testing.T) { func TestConsulHABackend(t *testing.T) {
consulToken := os.Getenv("CONSUL_HTTP_TOKEN") consulToken := os.Getenv("CONSUL_HTTP_TOKEN")
addr := os.Getenv("CONSUL_HTTP_ADDR") addr := os.Getenv("CONSUL_HTTP_ADDR")
if addr == "" { if addr == "" {
cleanup, connURL, token := consul.PrepareTestContainer(t, "1.4.0-rc1") cleanup, connURL, token := consul.PrepareTestContainer(t, "1.4.4")
defer cleanup() defer cleanup()
addr, consulToken = connURL, token addr, consulToken = connURL, token
} }

View File

@@ -3,7 +3,9 @@ package inmem
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"os" "os"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -34,14 +36,15 @@ var (
// expected to be durable. // expected to be durable.
type InmemBackend struct { type InmemBackend struct {
sync.RWMutex sync.RWMutex
root *radix.Tree root *radix.Tree
permitPool *physical.PermitPool permitPool *physical.PermitPool
logger log.Logger logger log.Logger
failGet *uint32 failGet *uint32
failPut *uint32 failPut *uint32
failDelete *uint32 failDelete *uint32
failList *uint32 failList *uint32
logOps bool logOps bool
maxValueSize int
} }
type TransactionalInmemBackend struct { type TransactionalInmemBackend struct {
@@ -49,36 +52,56 @@ type TransactionalInmemBackend struct {
} }
// NewInmem constructs a new in-memory backend // NewInmem constructs a new in-memory backend
func NewInmem(_ map[string]string, logger log.Logger) (physical.Backend, error) { func NewInmem(conf map[string]string, logger log.Logger) (physical.Backend, error) {
in := &InmemBackend{ maxValueSize := 0
root: radix.New(), maxValueSizeStr, ok := conf["max_value_size"]
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations), if ok {
logger: logger, var err error
failGet: new(uint32), maxValueSize, err = strconv.Atoi(maxValueSizeStr)
failPut: new(uint32), if err != nil {
failDelete: new(uint32), return nil, err
failList: new(uint32), }
logOps: os.Getenv("VAULT_INMEM_LOG_ALL_OPS") != "",
} }
return in, nil
return &InmemBackend{
root: radix.New(),
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
logger: logger,
failGet: new(uint32),
failPut: new(uint32),
failDelete: new(uint32),
failList: new(uint32),
logOps: os.Getenv("VAULT_INMEM_LOG_ALL_OPS") != "",
maxValueSize: maxValueSize,
}, nil
} }
// Basically for now just creates a permit pool of size 1 so only one operation // Basically for now just creates a permit pool of size 1 so only one operation
// can run at a time // can run at a time
func NewTransactionalInmem(_ map[string]string, logger log.Logger) (physical.Backend, error) { func NewTransactionalInmem(conf map[string]string, logger log.Logger) (physical.Backend, error) {
in := &TransactionalInmemBackend{ maxValueSize := 0
InmemBackend: InmemBackend{ maxValueSizeStr, ok := conf["max_value_size"]
root: radix.New(), if ok {
permitPool: physical.NewPermitPool(1), var err error
logger: logger, maxValueSize, err = strconv.Atoi(maxValueSizeStr)
failGet: new(uint32), if err != nil {
failPut: new(uint32), return nil, err
failDelete: new(uint32), }
failList: new(uint32),
logOps: os.Getenv("VAULT_INMEM_LOG_ALL_OPS") != "",
},
} }
return in, nil
return &TransactionalInmemBackend{
InmemBackend: InmemBackend{
root: radix.New(),
permitPool: physical.NewPermitPool(1),
logger: logger,
failGet: new(uint32),
failPut: new(uint32),
failDelete: new(uint32),
failList: new(uint32),
logOps: os.Getenv("VAULT_INMEM_LOG_ALL_OPS") != "",
maxValueSize: maxValueSize,
},
}, nil
} }
// Put is used to insert or update an entry // Put is used to insert or update an entry
@@ -106,6 +129,10 @@ func (i *InmemBackend) PutInternal(ctx context.Context, entry *physical.Entry) e
default: default:
} }
if i.maxValueSize > 0 && len(entry.Value) > i.maxValueSize {
return fmt.Errorf("%s", physical.ErrValueTooLarge)
}
i.root.Insert(entry.Key, entry.Value) i.root.Insert(entry.Key, entry.Value)
return nil return nil
} }

View File

@@ -20,6 +20,10 @@ const (
PutOperation = "put" PutOperation = "put"
) )
const (
ErrValueTooLarge = "put failed due to value being too large"
)
// ShutdownSignal // ShutdownSignal
type ShutdownChannel chan struct{} type ShutdownChannel chan struct{}

View File

@@ -1001,7 +1001,7 @@ func (m *ExpirationManager) RenewToken(ctx context.Context, req *logical.Request
if resp.Auth.EntityID != "" && if resp.Auth.EntityID != "" &&
resp.Auth.GroupAliases != nil && resp.Auth.GroupAliases != nil &&
m.core.identityStore != nil { m.core.identityStore != nil {
validAliases, err := m.core.identityStore.refreshExternalGroupMembershipsByEntityID(resp.Auth.EntityID, resp.Auth.GroupAliases) validAliases, err := m.core.identityStore.refreshExternalGroupMembershipsByEntityID(ctx, resp.Auth.EntityID, resp.Auth.GroupAliases)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -6,6 +6,7 @@ import (
log "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/api" "github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/builtin/credential/ldap" "github.com/hashicorp/vault/builtin/credential/ldap"
"github.com/hashicorp/vault/helper/namespace"
vaulthttp "github.com/hashicorp/vault/http" vaulthttp "github.com/hashicorp/vault/http"
"github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault" "github.com/hashicorp/vault/vault"
@@ -235,7 +236,9 @@ func TestIdentityStore_Integ_GroupAliases(t *testing.T) {
// Remove its member entities // Remove its member entities
group.MemberEntityIDs = nil group.MemberEntityIDs = nil
err = identityStore.UpsertGroup(group, true) ctx := namespace.RootContext(nil)
err = identityStore.UpsertGroup(ctx, group, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -256,7 +259,7 @@ func TestIdentityStore_Integ_GroupAliases(t *testing.T) {
// Remove its member entities // Remove its member entities
group.MemberEntityIDs = nil group.MemberEntityIDs = nil
err = identityStore.UpsertGroup(group, true) err = identityStore.UpsertGroup(ctx, group, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -277,7 +280,7 @@ func TestIdentityStore_Integ_GroupAliases(t *testing.T) {
// Remove its member entities // Remove its member entities
group.MemberEntityIDs = nil group.MemberEntityIDs = nil
err = identityStore.UpsertGroup(group, true) err = identityStore.UpsertGroup(ctx, group, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -251,7 +251,7 @@ func (i *IdentityStore) Invalidate(ctx context.Context, key string) {
} }
// Only update MemDB and don't touch the storage // Only update MemDB and don't touch the storage
err = i.UpsertGroupInTxn(txn, group, false) err = i.UpsertGroupInTxn(ctx, txn, group, false)
if err != nil { if err != nil {
i.logger.Error("failed to update group in MemDB", "error", err) i.logger.Error("failed to update group in MemDB", "error", err)
return return
@@ -336,7 +336,7 @@ func (i *IdentityStore) parseEntityFromBucketItem(ctx context.Context, item *sto
} }
// Store the entity with new format // Store the entity with new format
err = i.entityPacker.PutItem(item) err = i.entityPacker.PutItem(ctx, item)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -406,7 +406,7 @@ func (i *IdentityStore) pathAliasIDDelete() framework.OperationFunc {
Message: entityAsAny, Message: entityAsAny,
} }
err = i.entityPacker.PutItem(item) err = i.entityPacker.PutItem(ctx, item)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -262,7 +262,8 @@ func (i *IdentityStore) handleEntityUpdateCommon() framework.OperationFunc {
// Prepare the response // Prepare the response
respData := map[string]interface{}{ respData := map[string]interface{}{
"id": entity.ID, "id": entity.ID,
"name": entity.Name,
} }
var aliasIDs []string var aliasIDs []string
@@ -490,7 +491,7 @@ func (i *IdentityStore) handleEntityDeleteCommon(ctx context.Context, txn *memdb
for _, group := range groups { for _, group := range groups {
group.MemberEntityIDs = strutil.StrListDelete(group.MemberEntityIDs, entity.ID) group.MemberEntityIDs = strutil.StrListDelete(group.MemberEntityIDs, entity.ID)
err = i.UpsertGroupInTxn(txn, group, true) err = i.UpsertGroupInTxn(ctx, txn, group, true)
if err != nil { if err != nil {
return err return err
} }
@@ -509,7 +510,7 @@ func (i *IdentityStore) handleEntityDeleteCommon(ctx context.Context, txn *memdb
} }
// Delete the entity from storage // Delete the entity from storage
err = i.entityPacker.DeleteItem(entity.ID) err = i.entityPacker.DeleteItem(ctx, entity.ID)
if err != nil { if err != nil {
return err return err
} }
@@ -708,7 +709,7 @@ func (i *IdentityStore) mergeEntity(ctx context.Context, txn *memdb.Txn, toEntit
if persist && !isPerfSecondaryOrStandby { if persist && !isPerfSecondaryOrStandby {
// Delete the entity which we are merging from in storage // Delete the entity which we are merging from in storage
err = i.entityPacker.DeleteItem(fromEntity.ID) err = i.entityPacker.DeleteItem(ctx, fromEntity.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -732,7 +733,7 @@ func (i *IdentityStore) mergeEntity(ctx context.Context, txn *memdb.Txn, toEntit
Message: toEntityAsAny, Message: toEntityAsAny,
} }
err = i.entityPacker.PutItem(item) err = i.entityPacker.PutItem(ctx, item)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -294,7 +294,7 @@ func (i *IdentityStore) pathGroupAliasIDDelete() framework.OperationFunc {
// Delete the alias // Delete the alias
group.Alias = nil group.Alias = nil
err = i.UpsertGroupInTxn(txn, group, true) err = i.UpsertGroupInTxn(ctx, txn, group, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -443,7 +443,7 @@ func (i *IdentityStore) handleGroupDeleteCommon(ctx context.Context, key string,
} }
// Delete the group from storage // Delete the group from storage
err = i.groupPacker.DeleteItem(group.ID) err = i.groupPacker.DeleteItem(ctx, group.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -100,8 +100,10 @@ func TestIdentityStore_GroupEntityMembershipUpgrade(t *testing.T) {
// Manually add an invalid entity as the group's member // Manually add an invalid entity as the group's member
group.MemberEntityIDs = []string{"invalidentityid"} group.MemberEntityIDs = []string{"invalidentityid"}
ctx := namespace.RootContext(nil)
// Persist the group // Persist the group
err = c.identityStore.UpsertGroupInTxn(txn, group, true) err = c.identityStore.UpsertGroupInTxn(ctx, txn, group, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -86,7 +86,9 @@ func TestIdentityStore_UnsealingWhenConflictingAliasNames(t *testing.T) {
ID: entity2.ID, ID: entity2.ID,
Message: entity2Any, Message: entity2Any,
} }
if err = c.identityStore.entityPacker.PutItem(item); err != nil {
ctx := namespace.RootContext(nil)
if err = c.identityStore.entityPacker.PutItem(ctx, item); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -115,7 +115,7 @@ func (i *IdentityStore) loadGroups(ctx context.Context) error {
// Group's namespace doesn't exist anymore but the group // Group's namespace doesn't exist anymore but the group
// from the namespace still exists. // from the namespace still exists.
i.logger.Warn("deleting group and its any existing aliases", "name", group.Name, "namespace_id", group.NamespaceID) i.logger.Warn("deleting group and its any existing aliases", "name", group.Name, "namespace_id", group.NamespaceID)
err = i.groupPacker.DeleteItem(group.ID) err = i.groupPacker.DeleteItem(ctx, group.ID)
if err != nil { if err != nil {
return err return err
} }
@@ -158,7 +158,7 @@ func (i *IdentityStore) loadGroups(ctx context.Context) error {
} }
} }
err = i.UpsertGroupInTxn(txn, group, persist) err = i.UpsertGroupInTxn(ctx, txn, group, persist)
if err != nil { if err != nil {
txn.Abort() txn.Abort()
return errwrap.Wrapf("failed to update group in memdb: {{err}}", err) return errwrap.Wrapf("failed to update group in memdb: {{err}}", err)
@@ -282,7 +282,7 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
// Entity's namespace doesn't exist anymore but the // Entity's namespace doesn't exist anymore but the
// entity from the namespace still exists. // entity from the namespace still exists.
i.logger.Warn("deleting entity and its any existing aliases", "name", entity.Name, "namespace_id", entity.NamespaceID) i.logger.Warn("deleting entity and its any existing aliases", "name", entity.Name, "namespace_id", entity.NamespaceID)
err = i.entityPacker.DeleteItem(entity.ID) err = i.entityPacker.DeleteItem(ctx, entity.ID)
if err != nil { if err != nil {
return err return err
} }
@@ -419,7 +419,7 @@ func (i *IdentityStore) upsertEntityInTxn(ctx context.Context, txn *memdb.Txn, e
if err != nil { if err != nil {
return err return err
} }
err = i.entityPacker.PutItem(&storagepacker.Item{ err = i.entityPacker.PutItem(ctx, &storagepacker.Item{
ID: previousEntity.ID, ID: previousEntity.ID,
Message: marshaledPreviousEntity, Message: marshaledPreviousEntity,
}) })
@@ -446,7 +446,7 @@ func (i *IdentityStore) upsertEntityInTxn(ctx context.Context, txn *memdb.Txn, e
} }
// Persist the entity object // Persist the entity object
err = i.entityPacker.PutItem(item) err = i.entityPacker.PutItem(ctx, item)
if err != nil { if err != nil {
return err return err
} }
@@ -1128,7 +1128,7 @@ func (i *IdentityStore) sanitizeAndUpsertGroup(ctx context.Context, group *ident
// Remove group ID from the parent group IDs // Remove group ID from the parent group IDs
currentMemberGroup.ParentGroupIDs = strutil.StrListDelete(currentMemberGroup.ParentGroupIDs, group.ID) currentMemberGroup.ParentGroupIDs = strutil.StrListDelete(currentMemberGroup.ParentGroupIDs, group.ID)
err = i.UpsertGroupInTxn(txn, currentMemberGroup, true) err = i.UpsertGroupInTxn(ctx, txn, currentMemberGroup, true)
if err != nil { if err != nil {
return err return err
} }
@@ -1184,7 +1184,7 @@ func (i *IdentityStore) sanitizeAndUpsertGroup(ctx context.Context, group *ident
// This technically is not upsert. It is only update, only the method // This technically is not upsert. It is only update, only the method
// name is upsert here. // name is upsert here.
err = i.UpsertGroupInTxn(txn, memberGroup, true) err = i.UpsertGroupInTxn(ctx, txn, memberGroup, true)
if err != nil { if err != nil {
// Ideally we would want to revert the whole operation in case of // Ideally we would want to revert the whole operation in case of
// errors while persisting in member groups. But there is no // errors while persisting in member groups. But there is no
@@ -1204,7 +1204,7 @@ ALIAS:
} }
} }
err = i.UpsertGroupInTxn(txn, group, true) err = i.UpsertGroupInTxn(ctx, txn, group, true)
if err != nil { if err != nil {
return err return err
} }
@@ -1331,11 +1331,11 @@ func (i *IdentityStore) MemDBGroupByName(ctx context.Context, groupName string,
return i.MemDBGroupByNameInTxn(ctx, txn, groupName, clone) return i.MemDBGroupByNameInTxn(ctx, txn, groupName, clone)
} }
func (i *IdentityStore) UpsertGroup(group *identity.Group, persist bool) error { func (i *IdentityStore) UpsertGroup(ctx context.Context, group *identity.Group, persist bool) error {
txn := i.db.Txn(true) txn := i.db.Txn(true)
defer txn.Abort() defer txn.Abort()
err := i.UpsertGroupInTxn(txn, group, true) err := i.UpsertGroupInTxn(ctx, txn, group, true)
if err != nil { if err != nil {
return err return err
} }
@@ -1345,7 +1345,7 @@ func (i *IdentityStore) UpsertGroup(group *identity.Group, persist bool) error {
return nil return nil
} }
func (i *IdentityStore) UpsertGroupInTxn(txn *memdb.Txn, group *identity.Group, persist bool) error { func (i *IdentityStore) UpsertGroupInTxn(ctx context.Context, txn *memdb.Txn, group *identity.Group, persist bool) error {
var err error var err error
if txn == nil { if txn == nil {
@@ -1401,7 +1401,7 @@ func (i *IdentityStore) UpsertGroupInTxn(txn *memdb.Txn, group *identity.Group,
return err return err
} }
if !sent { if !sent {
if err := i.groupPacker.PutItem(item); err != nil { if err := i.groupPacker.PutItem(ctx, item); err != nil {
return err return err
} }
} }
@@ -1845,7 +1845,7 @@ func (i *IdentityStore) MemDBGroupByAliasID(aliasID string, clone bool) (*identi
return i.MemDBGroupByAliasIDInTxn(txn, aliasID, clone) return i.MemDBGroupByAliasIDInTxn(txn, aliasID, clone)
} }
func (i *IdentityStore) refreshExternalGroupMembershipsByEntityID(entityID string, groupAliases []*logical.Alias) ([]*logical.Alias, error) { func (i *IdentityStore) refreshExternalGroupMembershipsByEntityID(ctx context.Context, entityID string, groupAliases []*logical.Alias) ([]*logical.Alias, error) {
i.logger.Debug("refreshing external group memberships", "entity_id", entityID, "group_aliases", groupAliases) i.logger.Debug("refreshing external group memberships", "entity_id", entityID, "group_aliases", groupAliases)
if entityID == "" { if entityID == "" {
return nil, fmt.Errorf("empty entity ID") return nil, fmt.Errorf("empty entity ID")
@@ -1901,7 +1901,7 @@ func (i *IdentityStore) refreshExternalGroupMembershipsByEntityID(entityID strin
group.MemberEntityIDs = append(group.MemberEntityIDs, entityID) group.MemberEntityIDs = append(group.MemberEntityIDs, entityID)
err = i.UpsertGroupInTxn(txn, group, true) err = i.UpsertGroupInTxn(ctx, txn, group, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -1923,7 +1923,7 @@ func (i *IdentityStore) refreshExternalGroupMembershipsByEntityID(entityID strin
group.MemberEntityIDs = strutil.StrListDelete(group.MemberEntityIDs, entityID) group.MemberEntityIDs = strutil.StrListDelete(group.MemberEntityIDs, entityID)
err = i.UpsertGroupInTxn(txn, group, true) err = i.UpsertGroupInTxn(ctx, txn, group, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -1071,7 +1071,7 @@ func (c *Core) handleLoginRequest(ctx context.Context, req *logical.Request) (re
auth.EntityID = entity.ID auth.EntityID = entity.ID
if auth.GroupAliases != nil { if auth.GroupAliases != nil {
validAliases, err := c.identityStore.refreshExternalGroupMembershipsByEntityID(auth.EntityID, auth.GroupAliases) validAliases, err := c.identityStore.refreshExternalGroupMembershipsByEntityID(ctx, auth.EntityID, auth.GroupAliases)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@@ -3,7 +3,9 @@ package inmem
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"os" "os"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -34,14 +36,15 @@ var (
// expected to be durable. // expected to be durable.
type InmemBackend struct { type InmemBackend struct {
sync.RWMutex sync.RWMutex
root *radix.Tree root *radix.Tree
permitPool *physical.PermitPool permitPool *physical.PermitPool
logger log.Logger logger log.Logger
failGet *uint32 failGet *uint32
failPut *uint32 failPut *uint32
failDelete *uint32 failDelete *uint32
failList *uint32 failList *uint32
logOps bool logOps bool
maxValueSize int
} }
type TransactionalInmemBackend struct { type TransactionalInmemBackend struct {
@@ -49,36 +52,56 @@ type TransactionalInmemBackend struct {
} }
// NewInmem constructs a new in-memory backend // NewInmem constructs a new in-memory backend
func NewInmem(_ map[string]string, logger log.Logger) (physical.Backend, error) { func NewInmem(conf map[string]string, logger log.Logger) (physical.Backend, error) {
in := &InmemBackend{ maxValueSize := 0
root: radix.New(), maxValueSizeStr, ok := conf["max_value_size"]
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations), if ok {
logger: logger, var err error
failGet: new(uint32), maxValueSize, err = strconv.Atoi(maxValueSizeStr)
failPut: new(uint32), if err != nil {
failDelete: new(uint32), return nil, err
failList: new(uint32), }
logOps: os.Getenv("VAULT_INMEM_LOG_ALL_OPS") != "",
} }
return in, nil
return &InmemBackend{
root: radix.New(),
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
logger: logger,
failGet: new(uint32),
failPut: new(uint32),
failDelete: new(uint32),
failList: new(uint32),
logOps: os.Getenv("VAULT_INMEM_LOG_ALL_OPS") != "",
maxValueSize: maxValueSize,
}, nil
} }
// Basically for now just creates a permit pool of size 1 so only one operation // Basically for now just creates a permit pool of size 1 so only one operation
// can run at a time // can run at a time
func NewTransactionalInmem(_ map[string]string, logger log.Logger) (physical.Backend, error) { func NewTransactionalInmem(conf map[string]string, logger log.Logger) (physical.Backend, error) {
in := &TransactionalInmemBackend{ maxValueSize := 0
InmemBackend: InmemBackend{ maxValueSizeStr, ok := conf["max_value_size"]
root: radix.New(), if ok {
permitPool: physical.NewPermitPool(1), var err error
logger: logger, maxValueSize, err = strconv.Atoi(maxValueSizeStr)
failGet: new(uint32), if err != nil {
failPut: new(uint32), return nil, err
failDelete: new(uint32), }
failList: new(uint32),
logOps: os.Getenv("VAULT_INMEM_LOG_ALL_OPS") != "",
},
} }
return in, nil
return &TransactionalInmemBackend{
InmemBackend: InmemBackend{
root: radix.New(),
permitPool: physical.NewPermitPool(1),
logger: logger,
failGet: new(uint32),
failPut: new(uint32),
failDelete: new(uint32),
failList: new(uint32),
logOps: os.Getenv("VAULT_INMEM_LOG_ALL_OPS") != "",
maxValueSize: maxValueSize,
},
}, nil
} }
// Put is used to insert or update an entry // Put is used to insert or update an entry
@@ -106,6 +129,10 @@ func (i *InmemBackend) PutInternal(ctx context.Context, entry *physical.Entry) e
default: default:
} }
if i.maxValueSize > 0 && len(entry.Value) > i.maxValueSize {
return fmt.Errorf("%s", physical.ErrValueTooLarge)
}
i.root.Insert(entry.Key, entry.Value) i.root.Insert(entry.Key, entry.Value)
return nil return nil
} }

View File

@@ -20,6 +20,10 @@ const (
PutOperation = "put" PutOperation = "put"
) )
const (
ErrValueTooLarge = "put failed due to value being too large"
)
// ShutdownSignal // ShutdownSignal
type ShutdownChannel chan struct{} type ShutdownChannel chan struct{}