agent: tolerate partial restore failure from persistent cache (#12718)

* agent: tolerate partial restore failure from persistent cache

* Review comments: improved consistency, test robustness, comments, assertions
This commit is contained in:
Tom Proctor
2021-10-08 11:30:04 +01:00
committed by GitHub
parent 7f42e01682
commit 67069f011f
4 changed files with 132 additions and 66 deletions

View File

@@ -1,4 +1,3 @@
+```release-note:improvement ```release-note:improvement
+auth/kubernetes: validate JWT against the provided role on alias look ahead operations auth/kubernetes: validate JWT against the provided role on alias look ahead operations
+``` ```

3
changelog/12718.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:improvement
agent/cache: tolerate partial restore failure from persistent cache
```

View File

@@ -17,6 +17,7 @@ import (
"time" "time"
hclog "github.com/hashicorp/go-hclog" hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-secure-stdlib/base62" "github.com/hashicorp/go-secure-stdlib/base62"
"github.com/hashicorp/vault/api" "github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/command/agent/cache/cacheboltdb" "github.com/hashicorp/vault/command/agent/cache/cacheboltdb"
@@ -969,56 +970,69 @@ func (c *LeaseCache) Flush() error {
// tokens first, since restoring a lease's renewal context and watcher requires // tokens first, since restoring a lease's renewal context and watcher requires
// looking up the token in the cachememdb. // looking up the token in the cachememdb.
func (c *LeaseCache) Restore(ctx context.Context, storage *cacheboltdb.BoltStorage) error { func (c *LeaseCache) Restore(ctx context.Context, storage *cacheboltdb.BoltStorage) error {
var errors *multierror.Error
// Process tokens first // Process tokens first
tokens, err := storage.GetByType(ctx, cacheboltdb.TokenType) tokens, err := storage.GetByType(ctx, cacheboltdb.TokenType)
if err != nil { if err != nil {
return err errors = multierror.Append(errors, err)
} } else {
if err := c.restoreTokens(tokens); err != nil { if err := c.restoreTokens(tokens); err != nil {
return err errors = multierror.Append(errors, err)
}
} }
// Then process auth leases // Then process auth leases
authLeases, err := storage.GetByType(ctx, cacheboltdb.AuthLeaseType) authLeases, err := storage.GetByType(ctx, cacheboltdb.AuthLeaseType)
if err != nil { if err != nil {
return err errors = multierror.Append(errors, err)
} } else {
if err := c.restoreLeases(authLeases); err != nil { if err := c.restoreLeases(authLeases); err != nil {
return err errors = multierror.Append(errors, err)
}
} }
// Then process secret leases // Then process secret leases
secretLeases, err := storage.GetByType(ctx, cacheboltdb.SecretLeaseType) secretLeases, err := storage.GetByType(ctx, cacheboltdb.SecretLeaseType)
if err != nil { if err != nil {
return err errors = multierror.Append(errors, err)
} } else {
if err := c.restoreLeases(secretLeases); err != nil { if err := c.restoreLeases(secretLeases); err != nil {
return err errors = multierror.Append(errors, err)
}
} }
return nil return errors.ErrorOrNil()
} }
func (c *LeaseCache) restoreTokens(tokens [][]byte) error { func (c *LeaseCache) restoreTokens(tokens [][]byte) error {
var errors *multierror.Error
for _, token := range tokens { for _, token := range tokens {
newIndex, err := cachememdb.Deserialize(token) newIndex, err := cachememdb.Deserialize(token)
if err != nil { if err != nil {
return err errors = multierror.Append(errors, err)
continue
} }
newIndex.RenewCtxInfo = c.createCtxInfo(nil) newIndex.RenewCtxInfo = c.createCtxInfo(nil)
if err := c.db.Set(newIndex); err != nil { if err := c.db.Set(newIndex); err != nil {
return err errors = multierror.Append(errors, err)
continue
} }
c.logger.Trace("restored token", "id", newIndex.ID) c.logger.Trace("restored token", "id", newIndex.ID)
} }
return nil
return errors.ErrorOrNil()
} }
func (c *LeaseCache) restoreLeases(leases [][]byte) error { func (c *LeaseCache) restoreLeases(leases [][]byte) error {
var errors *multierror.Error
for _, lease := range leases { for _, lease := range leases {
newIndex, err := cachememdb.Deserialize(lease) newIndex, err := cachememdb.Deserialize(lease)
if err != nil { if err != nil {
return err errors = multierror.Append(errors, err)
continue
} }
// Check if this lease has already expired // Check if this lease has already expired
@@ -1031,14 +1045,17 @@ func (c *LeaseCache) restoreLeases(leases [][]byte) error {
} }
if err := c.restoreLeaseRenewCtx(newIndex); err != nil { if err := c.restoreLeaseRenewCtx(newIndex); err != nil {
return err errors = multierror.Append(errors, err)
continue
} }
if err := c.db.Set(newIndex); err != nil { if err := c.db.Set(newIndex); err != nil {
return err errors = multierror.Append(errors, err)
continue
} }
c.logger.Trace("restored lease", "id", newIndex.ID, "path", newIndex.RequestPath) c.logger.Trace("restored lease", "id", newIndex.ID, "path", newIndex.RequestPath)
} }
return nil
return errors.ErrorOrNil()
} }
// restoreLeaseRenewCtx re-creates a RenewCtx for an index object and starts // restoreLeaseRenewCtx re-creates a RenewCtx for an index object and starts

View File

@@ -16,6 +16,7 @@ import (
"github.com/go-test/deep" "github.com/go-test/deep"
hclog "github.com/hashicorp/go-hclog" hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/vault/api" "github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/command/agent/cache/cacheboltdb" "github.com/hashicorp/vault/command/agent/cache/cacheboltdb"
"github.com/hashicorp/vault/command/agent/cache/cachememdb" "github.com/hashicorp/vault/command/agent/cache/cachememdb"
@@ -711,13 +712,20 @@ func setupBoltStorage(t *testing.T) (tempCacheDir string, boltStorage *cachebolt
} }
func TestLeaseCache_PersistAndRestore(t *testing.T) { func TestLeaseCache_PersistAndRestore(t *testing.T) {
// Emulate 4 responses from the api proxy. The first two use the auto-auth // Emulate responses from the api proxy. The first two use the auto-auth
// token, and the last two use another token. // token, and the others use another token.
// The test re-sends each request to ensure that the response is cached
// so the number of responses and cacheTests specified should always be equal.
responses := []*SendResponse{ responses := []*SendResponse{
newTestSendResponse(200, `{"auth": {"client_token": "testtoken", "renewable": true, "lease_duration": 600}}`), newTestSendResponse(200, `{"auth": {"client_token": "testtoken", "renewable": true, "lease_duration": 600}}`),
newTestSendResponse(201, `{"lease_id": "foo", "renewable": true, "data": {"value": "foo"}, "lease_duration": 600}`), newTestSendResponse(201, `{"lease_id": "foo", "renewable": true, "data": {"value": "foo"}, "lease_duration": 600}`),
// The auth token will get manually deleted from the bolt DB storage, causing both of the following two responses
// to be missing from the cache after a restore, because the lease is a child of the auth token.
newTestSendResponse(202, `{"auth": {"client_token": "testtoken2", "renewable": true, "orphan": true, "lease_duration": 600}}`), newTestSendResponse(202, `{"auth": {"client_token": "testtoken2", "renewable": true, "orphan": true, "lease_duration": 600}}`),
newTestSendResponse(203, `{"lease_id": "secret2-lease", "renewable": true, "data": {"number": "two"}, "lease_duration": 600}`), newTestSendResponse(203, `{"lease_id": "secret2-lease", "renewable": true, "data": {"number": "two"}, "lease_duration": 600}`),
// 204 No content gets special handling - avoid.
newTestSendResponse(250, `{"auth": {"client_token": "testtoken3", "renewable": true, "orphan": true, "lease_duration": 600}}`),
newTestSendResponse(251, `{"lease_id": "secret3-lease", "renewable": true, "data": {"number": "three"}, "lease_duration": 600}`),
} }
tempDir, boltStorage := setupBoltStorage(t) tempDir, boltStorage := setupBoltStorage(t)
@@ -726,59 +734,82 @@ func TestLeaseCache_PersistAndRestore(t *testing.T) {
lc := testNewLeaseCacheWithPersistence(t, responses, boltStorage) lc := testNewLeaseCacheWithPersistence(t, responses, boltStorage)
// Register an auto-auth token so that the token and lease requests are cached // Register an auto-auth token so that the token and lease requests are cached
lc.RegisterAutoAuthToken("autoauthtoken") err := lc.RegisterAutoAuthToken("autoauthtoken")
require.NoError(t, err)
cacheTests := []struct { cacheTests := []struct {
token string token string
method string method string
urlPath string urlPath string
body string body string
wantStatusCode int deleteFromPersistentStore bool // If true, will be deleted from bolt DB to induce an error on restore
expectMissingAfterRestore bool // If true, the response is not expected to be present in the restored cache
}{ }{
{ {
// Make a request. A response with a new token is returned to the // Make a request. A response with a new token is returned to the
// lease cache and that will be cached. // lease cache and that will be cached.
token: "autoauthtoken", token: "autoauthtoken",
method: "GET", method: "GET",
urlPath: "http://example.com/v1/sample/api", urlPath: "http://example.com/v1/sample/api",
body: `{"value": "input"}`, body: `{"value": "input"}`,
wantStatusCode: responses[0].Response.StatusCode,
}, },
{ {
// Modify the request a little bit to ensure the second response is // Modify the request a little bit to ensure the second response is
// returned to the lease cache. // returned to the lease cache.
token: "autoauthtoken", token: "autoauthtoken",
method: "GET", method: "GET",
urlPath: "http://example.com/v1/sample/api", urlPath: "http://example.com/v1/sample/api",
body: `{"value": "input_changed"}`, body: `{"value": "input_changed"}`,
wantStatusCode: responses[1].Response.StatusCode,
}, },
{ {
// Simulate an approle login to get another token // Simulate an approle login to get another token
method: "PUT", method: "PUT",
urlPath: "http://example.com/v1/auth/approle/login", urlPath: "http://example.com/v1/auth/approle-expect-missing/login",
body: `{"role_id": "my role", "secret_id": "my secret"}`, body: `{"role_id": "my role", "secret_id": "my secret"}`,
wantStatusCode: responses[2].Response.StatusCode, deleteFromPersistentStore: true,
expectMissingAfterRestore: true,
}, },
{ {
// Test caching with the token acquired from the approle login // Test caching with the token acquired from the approle login
token: "testtoken2", token: "testtoken2",
method: "GET", method: "GET",
urlPath: "http://example.com/v1/sample2/api", urlPath: "http://example.com/v1/sample-expect-missing/api",
body: `{"second": "input"}`, body: `{"second": "input"}`,
wantStatusCode: responses[3].Response.StatusCode, // This will be missing from the restored cache because its parent token was deleted
expectMissingAfterRestore: true,
},
{
// Simulate another approle login to get another token
method: "PUT",
urlPath: "http://example.com/v1/auth/approle/login",
body: `{"role_id": "my role", "secret_id": "my secret"}`,
},
{
// Test caching with the token acquired from the latest approle login
token: "testtoken3",
method: "GET",
urlPath: "http://example.com/v1/sample3/api",
body: `{"third": "input"}`,
}, },
} }
for _, ct := range cacheTests { var deleteIDs []string
for i, ct := range cacheTests {
// Send once to cache // Send once to cache
sendReq := &SendRequest{ sendReq := &SendRequest{
Token: ct.token, Token: ct.token,
Request: httptest.NewRequest(ct.method, ct.urlPath, strings.NewReader(ct.body)), Request: httptest.NewRequest(ct.method, ct.urlPath, strings.NewReader(ct.body)),
} }
if ct.deleteFromPersistentStore {
deleteID, err := computeIndexID(sendReq)
require.NoError(t, err)
deleteIDs = append(deleteIDs, deleteID)
// Now reset the body after calculating the index
sendReq.Request = httptest.NewRequest(ct.method, ct.urlPath, strings.NewReader(ct.body))
}
resp, err := lc.Send(context.Background(), sendReq) resp, err := lc.Send(context.Background(), sendReq)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, resp.Response.StatusCode, ct.wantStatusCode, "expected proxied response") assert.Equal(t, responses[i].Response.StatusCode, resp.Response.StatusCode, "expected proxied response")
assert.Nil(t, resp.CacheMeta) assert.Nil(t, resp.CacheMeta)
// Send again to test cache. If this isn't cached, the response returned // Send again to test cache. If this isn't cached, the response returned
@@ -789,24 +820,36 @@ func TestLeaseCache_PersistAndRestore(t *testing.T) {
} }
respCached, err := lc.Send(context.Background(), sendCacheReq) respCached, err := lc.Send(context.Background(), sendCacheReq)
require.NoError(t, err, "failed to send request %+v", ct) require.NoError(t, err, "failed to send request %+v", ct)
assert.Equal(t, respCached.Response.StatusCode, ct.wantStatusCode, "expected proxied response") assert.Equal(t, responses[i].Response.StatusCode, respCached.Response.StatusCode, "expected proxied response")
require.NotNil(t, respCached.CacheMeta) require.NotNil(t, respCached.CacheMeta)
assert.True(t, respCached.CacheMeta.Hit) assert.True(t, respCached.CacheMeta.Hit)
} }
// Now we know the cache is working, so try restoring from the persisted require.NotEmpty(t, deleteIDs)
// cache's storage for _, deleteID := range deleteIDs {
restoredCache := testNewLeaseCache(t, nil) err = boltStorage.Delete(deleteID)
require.NoError(t, err)
}
err := restoredCache.Restore(context.Background(), boltStorage) // Now we know the cache is working, so try restoring from the persisted
assert.NoError(t, err) // cache's storage. Responses 3 and 4 have been cleared from the cache, so
// re-send those.
restoredCache := testNewLeaseCache(t, responses[2:4])
err = restoredCache.Restore(context.Background(), boltStorage)
errors, ok := err.(*multierror.Error)
require.True(t, ok)
assert.Len(t, errors.Errors, 1)
assert.Contains(t, errors.Error(), "could not find parent Token testtoken2")
// Now compare before and after // Now compare before and after
beforeDB, err := lc.db.GetByPrefix(cachememdb.IndexNameID) beforeDB, err := lc.db.GetByPrefix(cachememdb.IndexNameID)
require.NoError(t, err) require.NoError(t, err)
assert.Len(t, beforeDB, 5) assert.Len(t, beforeDB, 7)
for _, cachedItem := range beforeDB { for _, cachedItem := range beforeDB {
if strings.Contains(cachedItem.RequestPath, "expect-missing") {
continue
}
restoredItem, err := restoredCache.db.Get(cachememdb.IndexNameID, cachedItem.ID) restoredItem, err := restoredCache.db.Get(cachememdb.IndexNameID, cachedItem.ID)
require.NoError(t, err) require.NoError(t, err)
@@ -838,17 +881,21 @@ func TestLeaseCache_PersistAndRestore(t *testing.T) {
assert.Len(t, afterDB, 5) assert.Len(t, afterDB, 5)
// And finally send the cache requests once to make sure they're all being // And finally send the cache requests once to make sure they're all being
// served from the restoredCache // served from the restoredCache unless they were intended to be missing after restore.
for _, ct := range cacheTests { for i, ct := range cacheTests {
sendCacheReq := &SendRequest{ sendCacheReq := &SendRequest{
Token: ct.token, Token: ct.token,
Request: httptest.NewRequest(ct.method, ct.urlPath, strings.NewReader(ct.body)), Request: httptest.NewRequest(ct.method, ct.urlPath, strings.NewReader(ct.body)),
} }
respCached, err := restoredCache.Send(context.Background(), sendCacheReq) respCached, err := restoredCache.Send(context.Background(), sendCacheReq)
require.NoError(t, err, "failed to send request %+v", ct) require.NoError(t, err, "failed to send request %+v", ct)
assert.Equal(t, respCached.Response.StatusCode, ct.wantStatusCode, "expected proxied response") assert.Equal(t, responses[i].Response.StatusCode, respCached.Response.StatusCode, "expected proxied response")
require.NotNil(t, respCached.CacheMeta) if ct.expectMissingAfterRestore {
assert.True(t, respCached.CacheMeta.Hit) require.Nil(t, respCached.CacheMeta)
} else {
require.NotNil(t, respCached.CacheMeta)
assert.True(t, respCached.CacheMeta.Hit)
}
} }
} }