mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-11-01 19:17:58 +00:00
Stop processing ACME verifications when active node is stepped down (#23278)
- Do not load existing ACME challenges persisted within storage on non-active nodes. This was the main culprit of the issues, secondary nodes would load existing persisted challenges trying to resolve them but writes would fail leading to the excessive logging.
- We now handle this by not starting the ACME background thread on non-active nodes, while also checking within the scheduling loop and breaking out. That will force a re-reading of the Closing channel that should have been called by the PKI plugin's Cleanup method.
- If a node is stepped down from being the active node while it is actively processing a verification, we could get into an infinite loop due to an ErrReadOnly error attempting to clean up a challenge entry
- Add a maximum number of retries for errors around attempting to decode,fetch challenge/authorization entries from disk. We use double the number of "normal" max attempts for these types of errors, than we would for normal ACME retry attempts to avoid collision issues. Note that these additional retry attempts are not persisted to disk and will restart on every node start
- Add a 1 second backoff to any disk related error to not immediately spin on disk/io errors for challenges.
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
)
|
||||
|
||||
@@ -42,6 +43,7 @@ type ChallengeValidation struct {
|
||||
type ChallengeQueueEntry struct {
|
||||
Identifier string
|
||||
RetryAfter time.Time
|
||||
NumRetries int // Track if we are spinning on a corrupted challenge
|
||||
}
|
||||
|
||||
type ACMEChallengeEngine struct {
|
||||
@@ -97,7 +99,7 @@ func (ace *ACMEChallengeEngine) Run(b *backend, state *acmeState, sc *storageCon
|
||||
b.Logger().Error("failed loading existing ACME challenge validations:", "err", err)
|
||||
}
|
||||
|
||||
for true {
|
||||
for {
|
||||
// err == nil on shutdown.
|
||||
b.Logger().Debug("Starting ACME challenge validation engine")
|
||||
err := ace._run(b, state)
|
||||
@@ -119,7 +121,7 @@ func (ace *ACMEChallengeEngine) _run(b *backend, state *acmeState) error {
|
||||
// We want at most a certain number of workers operating to verify
|
||||
// challenges.
|
||||
var finishedWorkersChannels []chan bool
|
||||
for true {
|
||||
for {
|
||||
// Wait until we've got more work to do.
|
||||
select {
|
||||
case <-ace.Closing:
|
||||
@@ -201,12 +203,17 @@ func (ace *ACMEChallengeEngine) _run(b *backend, state *acmeState) error {
|
||||
// looping through the queue until we hit a repeat.
|
||||
firstIdentifier = ""
|
||||
|
||||
// If we are no longer the active node, break out
|
||||
if b.System().ReplicationState().HasState(consts.ReplicationDRSecondary | consts.ReplicationPerformanceStandby) {
|
||||
break
|
||||
}
|
||||
|
||||
// Here, we got a piece of work that is ready to check; create a
|
||||
// channel and a new go routine and run it. Note that this still
|
||||
// could have a RetryAfter date we're not aware of (e.g., if the
|
||||
// cluster restarted as we do not read the entries there).
|
||||
channel := make(chan bool, 1)
|
||||
go ace.VerifyChallenge(runnerSC, task.Identifier, channel, config)
|
||||
go ace.VerifyChallenge(runnerSC, task.Identifier, task.NumRetries, channel, config)
|
||||
finishedWorkersChannels = append(finishedWorkersChannels, channel)
|
||||
startedWork = true
|
||||
}
|
||||
@@ -305,8 +312,9 @@ func (ace *ACMEChallengeEngine) AcceptChallenge(sc *storageContext, account stri
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ace *ACMEChallengeEngine) VerifyChallenge(runnerSc *storageContext, id string, finished chan bool, config *acmeConfigEntry) {
|
||||
sc, _ /* cancel func */ := runnerSc.WithFreshTimeout(MaxChallengeTimeout)
|
||||
func (ace *ACMEChallengeEngine) VerifyChallenge(runnerSc *storageContext, id string, validationQueueRetries int, finished chan bool, config *acmeConfigEntry) {
|
||||
sc, cancel := runnerSc.WithFreshTimeout(MaxChallengeTimeout)
|
||||
defer cancel()
|
||||
runnerSc.Backend.Logger().Debug("Starting verification of challenge", "id", id)
|
||||
|
||||
if retry, retryAfter, err := ace._verifyChallenge(sc, id, config); err != nil {
|
||||
@@ -316,11 +324,28 @@ func (ace *ACMEChallengeEngine) VerifyChallenge(runnerSc *storageContext, id str
|
||||
sc.Backend.Logger().Error(fmt.Sprintf("ACME validation failed for %v: %v", id, err))
|
||||
|
||||
if retry {
|
||||
validationQueueRetries++
|
||||
|
||||
// The retry logic within _verifyChallenge is dependent on being able to read and decode
|
||||
// the ACME challenge entries. If we encounter such failures we would retry forever, so
|
||||
// we have a secondary check here to see if we are consistently looping within the validation
|
||||
// queue that is larger than the normal retry attempts we would allow.
|
||||
if validationQueueRetries > MaxRetryAttempts*2 {
|
||||
sc.Backend.Logger().Warn("reached max error attempts within challenge queue: %v, giving up", id)
|
||||
_, _, err = ace._verifyChallengeCleanup(sc, nil, id)
|
||||
if err != nil {
|
||||
sc.Backend.Logger().Warn("Failed cleaning up challenge entry: %v", err)
|
||||
}
|
||||
finished <- true
|
||||
return
|
||||
}
|
||||
|
||||
ace.ValidationLock.Lock()
|
||||
defer ace.ValidationLock.Unlock()
|
||||
ace.Validations.PushBack(&ChallengeQueueEntry{
|
||||
Identifier: id,
|
||||
RetryAfter: retryAfter,
|
||||
NumRetries: validationQueueRetries,
|
||||
})
|
||||
|
||||
// Let the validator know there's a pending challenge.
|
||||
@@ -343,23 +368,23 @@ func (ace *ACMEChallengeEngine) VerifyChallenge(runnerSc *storageContext, id str
|
||||
|
||||
func (ace *ACMEChallengeEngine) _verifyChallenge(sc *storageContext, id string, config *acmeConfigEntry) (bool, time.Time, error) {
|
||||
now := time.Now()
|
||||
backoffTime := now.Add(1 * time.Second)
|
||||
path := acmeValidationPrefix + id
|
||||
challengeEntry, err := sc.Storage.Get(sc.Context, path)
|
||||
if err != nil {
|
||||
return true, now, fmt.Errorf("error loading challenge %v: %w", id, err)
|
||||
return true, backoffTime, fmt.Errorf("error loading challenge %v: %w", id, err)
|
||||
}
|
||||
|
||||
if challengeEntry == nil {
|
||||
// Something must've successfully cleaned up our storage entry from
|
||||
// under us. Assume we don't need to rerun, else the client will
|
||||
// trigger us to re-run.
|
||||
err = nil
|
||||
return ace._verifyChallengeCleanup(sc, err, id)
|
||||
return ace._verifyChallengeCleanup(sc, nil, id)
|
||||
}
|
||||
|
||||
var cv *ChallengeValidation
|
||||
if err := challengeEntry.DecodeJSON(&cv); err != nil {
|
||||
return true, now, fmt.Errorf("error decoding challenge %v: %w", id, err)
|
||||
return true, backoffTime, fmt.Errorf("error decoding challenge %v: %w", id, err)
|
||||
}
|
||||
|
||||
if now.Before(cv.RetryAfter) {
|
||||
@@ -369,7 +394,7 @@ func (ace *ACMEChallengeEngine) _verifyChallenge(sc *storageContext, id string,
|
||||
authzPath := getAuthorizationPath(cv.Account, cv.Authorization)
|
||||
authz, err := loadAuthorizationAtPath(sc, authzPath)
|
||||
if err != nil {
|
||||
return true, now, fmt.Errorf("error loading authorization %v/%v for challenge %v: %w", cv.Account, cv.Authorization, id, err)
|
||||
return true, backoffTime, fmt.Errorf("error loading authorization %v/%v for challenge %v: %w", cv.Account, cv.Authorization, id, err)
|
||||
}
|
||||
|
||||
if authz.Status != ACMEAuthorizationPending {
|
||||
@@ -527,7 +552,7 @@ func (ace *ACMEChallengeEngine) _verifyChallengeCleanup(sc *storageContext, err
|
||||
|
||||
// Remove our ChallengeValidation entry only.
|
||||
if deleteErr := sc.Storage.Delete(sc.Context, acmeValidationPrefix+id); deleteErr != nil {
|
||||
return true, now.Add(-1 * time.Second), fmt.Errorf("error deleting challenge %v (error prior to cleanup, if any: %v): %w", id, err, deleteErr)
|
||||
return true, now.Add(1 * time.Second), fmt.Errorf("error deleting challenge %v (error prior to cleanup, if any: %v): %w", id, err, deleteErr)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
|
||||
"github.com/hashicorp/go-secure-stdlib/nonceutil"
|
||||
"github.com/hashicorp/vault/sdk/framework"
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
)
|
||||
|
||||
@@ -76,6 +77,13 @@ func (a *acmeState) Initialize(b *backend, sc *storageContext) error {
|
||||
return fmt.Errorf("error initializing ACME engine: %w", err)
|
||||
}
|
||||
|
||||
if b.System().ReplicationState().HasState(consts.ReplicationDRSecondary | consts.ReplicationPerformanceStandby) {
|
||||
// It is assumed, that if the node does become the active node later
|
||||
// the plugin is re-initialized, so this is safe. It also spares the node
|
||||
// from loading the existing queue into memory for no reason.
|
||||
b.Logger().Debug("Not on an active node, skipping starting ACME challenge validation engine")
|
||||
return nil
|
||||
}
|
||||
// Kick off our ACME challenge validation engine.
|
||||
go a.validator.Run(b, a, sc)
|
||||
|
||||
|
||||
@@ -64,9 +64,10 @@ func (ts *TestServer) setupRunner(domain string, network string) {
|
||||
ContainerName: "bind9-dns-" + strings.ReplaceAll(domain, ".", "-"),
|
||||
NetworkName: network,
|
||||
Ports: []string{"53/udp"},
|
||||
LogConsumer: func(s string) {
|
||||
ts.log.Info(s)
|
||||
},
|
||||
// DNS container logging was disabled to reduce content within CI logs.
|
||||
//LogConsumer: func(s string) {
|
||||
// ts.log.Info(s)
|
||||
//},
|
||||
})
|
||||
require.NoError(ts.t, err)
|
||||
}
|
||||
|
||||
3
changelog/23278.txt
Normal file
3
changelog/23278.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
```release-note:bug
|
||||
secrets/pki: Stop processing in-flight ACME verifications when an active node steps down
|
||||
```
|
||||
Reference in New Issue
Block a user