Signal ACME challenge engine if existing challenges were loaded on startup (#21115)

* Signal ACME challenge engine if existing challenges were loaded

 - Addresses an issue of existing challenges on disk not being processed until a new challenge is accepted when Vault restarts
 - Move loading of existing challenges from the plugin's initialize method into the challenge engine's thread
 - Add docker test that validates we addressed the issue and ACME works across standby nodes.

* Add cl
This commit is contained in:
Steven Clark
2023-06-12 11:09:20 -04:00
committed by GitHub
parent f2887a2535
commit c855ba6a90
5 changed files with 232 additions and 18 deletions

View File

@@ -60,14 +60,6 @@ func NewACMEChallengeEngine() *ACMEChallengeEngine {
return ace
}
func (ace *ACMEChallengeEngine) Initialize(b *backend, sc *storageContext) error {
if err := ace.LoadFromStorage(b, sc); err != nil {
return fmt.Errorf("failed loading initial in-progress validations: %w", err)
}
return nil
}
func (ace *ACMEChallengeEngine) LoadFromStorage(b *backend, sc *storageContext) error {
items, err := sc.Storage.List(sc.Context, acmeValidationPrefix)
if err != nil {
@@ -78,16 +70,30 @@ func (ace *ACMEChallengeEngine) LoadFromStorage(b *backend, sc *storageContext)
defer ace.ValidationLock.Unlock()
// Add them to our queue of validations to work through later.
foundExistingValidations := false
for _, item := range items {
ace.Validations.PushBack(&ChallengeQueueEntry{
Identifier: item,
})
foundExistingValidations = true
}
if foundExistingValidations {
ace.NewValidation <- "existing"
}
return nil
}
func (ace *ACMEChallengeEngine) Run(b *backend, state *acmeState) {
func (ace *ACMEChallengeEngine) Run(b *backend, state *acmeState, sc *storageContext) {
// We load the existing ACME challenges within the Run thread to avoid
// delaying the PKI mount initialization
b.Logger().Debug("Loading existing challenge validations on disk")
err := ace.LoadFromStorage(b, sc)
if err != nil {
b.Logger().Error("failed loading existing ACME challenge validations:", "err", err)
}
for true {
// err == nil on shutdown.
b.Logger().Debug("Starting ACME challenge validation engine")

View File

@@ -77,10 +77,7 @@ func (a *acmeState) Initialize(b *backend, sc *storageContext) error {
}
// Kick off our ACME challenge validation engine.
if err := a.validator.Initialize(b, sc); err != nil {
return fmt.Errorf("error initializing ACME engine: %w", err)
}
go a.validator.Run(b, a)
go a.validator.Run(b, a, sc)
// All good.
return nil

View File

@@ -13,6 +13,7 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"encoding/hex"
"fmt"
"net"
"net/http"
"path"
@@ -58,6 +59,9 @@ func Test_ACME(t *testing.T) {
})
}
})
// Do not run these tests in parallel.
t.Run("step down", func(gt *testing.T) { SubtestACMEStepDownNode(gt, cluster) })
}
func SubtestACMECertbot(t *testing.T, cluster *VaultPkiCluster) {
@@ -742,6 +746,181 @@ func SubtestACMEPreventsICADNS(t *testing.T, cluster *VaultPkiCluster) {
pki.RemoveDNSRecordsForDomain(hostname)
}
// SubtestACMEStepDownNode Verify that we can properly run an ACME session through a
// secondary node, and midway through the challenge verification process, seal the
// active node and make sure we can complete the ACME session on the new active node.
func SubtestACMEStepDownNode(t *testing.T, cluster *VaultPkiCluster) {
pki, err := cluster.CreateAcmeMount("stepdown-test")
require.NoError(t, err)
// Since we interact with ACME from outside the container network the ACME
// configuration needs to be updated to use the host port and not the internal
// docker ip. We also grab the non-active node here on purpose to verify
// ACME related APIs are properly forwarded across standby hosts.
nonActiveNodes := pki.GetNonActiveNodes()
require.GreaterOrEqual(t, len(nonActiveNodes), 1, "Need at least one non-active node")
nonActiveNode := nonActiveNodes[0]
basePath := fmt.Sprintf("https://%s/v1/%s", nonActiveNode.HostPort, pki.mount)
err = pki.UpdateClusterConfig(map[string]interface{}{
"path": basePath,
})
hostname := "go-lang-stepdown-client.dadgarcorp.com"
acmeOrderIdentifiers := []acme.AuthzID{
{Type: "dns", Value: hostname},
}
cr := &x509.CertificateRequest{
DNSNames: []string{hostname, hostname},
}
accountKey, err := rsa.GenerateKey(rand.Reader, 2048)
require.NoError(t, err, "failed creating rsa account key")
acmeClient := &acme.Client{
Key: accountKey,
HTTPClient: &http.Client{Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}},
DirectoryURL: basePath + "/acme/directory",
}
testCtx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancelFunc()
// Create new account
_, err = acmeClient.Register(testCtx, &acme.Account{Contact: []string{"mailto:ipsans@dadgarcorp.com"}},
func(tosURL string) bool { return true })
require.NoError(t, err, "failed registering account")
// Create an ACME order
order, err := acmeClient.AuthorizeOrder(testCtx, acmeOrderIdentifiers)
require.NoError(t, err, "failed creating ACME order")
require.Len(t, order.AuthzURLs, 1, "expected a single authz url")
authUrl := order.AuthzURLs[0]
authorization, err := acmeClient.GetAuthorization(testCtx, authUrl)
require.NoError(t, err, "failed to lookup authorization at url: %s", authUrl)
dnsTxtRecordsToAdd := map[string]string{}
var challengesToAccept []*acme.Challenge
for _, challenge := range authorization.Challenges {
if challenge.Status != acme.StatusPending {
t.Logf("ignoring challenge not in status pending: %v", challenge)
continue
}
if challenge.Type == "dns-01" {
challengeBody, err := acmeClient.DNS01ChallengeRecord(challenge.Token)
require.NoError(t, err, "failed generating challenge response")
// Collect the challenges for us to add the DNS records after step-down
dnsTxtRecordsToAdd["_acme-challenge."+authorization.Identifier.Value] = challengeBody
challengesToAccept = append(challengesToAccept, challenge)
}
}
// FIXME: This sleep seems to resolve the raft issue if running the test individually
// time.Sleep(20 * time.Second)
// Tell the ACME server, that they can now validate those challenges, this will cause challenge
// verification failures on the main node as the DNS records do not exist.
for _, challenge := range challengesToAccept {
_, err = acmeClient.Accept(testCtx, challenge)
require.NoError(t, err, "failed to accept challenge: %v", challenge)
}
// Now wait till we start seeing the challenge engine start failing the lookups.
testhelpers.RetryUntil(t, 10*time.Second, func() error {
myAuth, err := acmeClient.GetAuthorization(testCtx, authUrl)
require.NoError(t, err, "failed to lookup authorization at url: %s", authUrl)
for _, challenge := range myAuth.Challenges {
if challenge.Error != nil {
// The engine failed on one of the challenges, we are done waiting
return nil
}
}
return fmt.Errorf("no challenges for auth %v contained any errors", myAuth.Identifier)
})
// Seal the active node now and wait for the next node to appear
previousActiveNode := pki.GetActiveClusterNode()
t.Logf("Stepping down node id: %s", previousActiveNode.NodeID)
haStatus, _ := previousActiveNode.APIClient().Sys().HAStatus()
t.Logf("Node: %v HaStatus: %v\n", previousActiveNode.NodeID, haStatus)
testhelpers.RetryUntil(t, 2*time.Minute, func() error {
state, err := previousActiveNode.APIClient().Sys().RaftAutopilotState()
if err != nil {
return err
}
t.Logf("Node: %v Raft AutoPilotState: %v\n", previousActiveNode.NodeID, state)
if !state.Healthy {
return fmt.Errorf("raft auto pilot state is not healthy")
}
return nil
})
t.Logf("Sealing active node")
err = previousActiveNode.APIClient().Sys().Seal()
require.NoError(t, err, "failed stepping down node")
// Add our DNS records now
t.Logf("Adding DNS records")
for dnsHost, dnsValue := range dnsTxtRecordsToAdd {
err = pki.AddDNSRecord(dnsHost, "TXT", dnsValue)
require.NoError(t, err, "failed adding DNS record: %s:%s", dnsHost, dnsValue)
}
// Wait for our new active node to come up
testhelpers.RetryUntil(t, 2*time.Minute, func() error {
newNode := pki.GetActiveClusterNode()
if newNode.NodeID == previousActiveNode.NodeID {
return fmt.Errorf("existing node is still the leader after stepdown: %s", newNode.NodeID)
}
t.Logf("New active node has node id: %v", newNode.NodeID)
return nil
})
// Wait for the order/challenges to be validated.
_, err = acmeClient.WaitOrder(testCtx, order.URI)
if err != nil {
// We failed waiting for the order to become ready, lets print out current challenge statuses to help debugging
myAuth, authErr := acmeClient.GetAuthorization(testCtx, authUrl)
require.NoError(t, authErr, "failed to lookup authorization at url: %s and wait order failed with: %v", authUrl, err)
t.Logf("Authorization Status: %s", myAuth.Status)
for _, challenge := range myAuth.Challenges {
// The engine failed on one of the challenges, we are done waiting
t.Logf("challenge: %v state: %v Error: %v", challenge.Type, challenge.Status, challenge.Error)
}
require.NoError(t, err, "failed waiting for order to be ready")
}
// Create/sign the CSR and ask ACME server to sign it returning us the final certificate
csrKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
csr, err := x509.CreateCertificateRequest(rand.Reader, cr, csrKey)
require.NoError(t, err, "failed generating csr")
certs, _, err := acmeClient.CreateOrderCert(testCtx, order.FinalizeURL, csr, false)
require.NoError(t, err, "failed to get a certificate back from ACME")
_, err = x509.ParseCertificate(certs[0])
require.NoError(t, err, "failed parsing acme cert bytes")
}
func getDockerLog(t *testing.T) (func(s string), *pkiext.LogConsumerWriter, *pkiext.LogConsumerWriter) {
logConsumer := func(s string) {
t.Logf(s)

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"os"
"testing"
"time"
"github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/builtin/logical/pki/dnstest"
@@ -37,7 +38,7 @@ func NewVaultPkiCluster(t *testing.T) *VaultPkiCluster {
VaultNodeConfig: &testcluster.VaultNodeConfig{
LogLevel: "TRACE",
},
NumCores: 1,
NumCores: 3,
},
}
@@ -60,8 +61,36 @@ func (vpc *VaultPkiCluster) Cleanup() {
}
}
func (vpc *VaultPkiCluster) GetActiveClusterNode() *docker.DockerClusterNode {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
node, err := testcluster.WaitForActiveNode(ctx, vpc.cluster)
if err != nil {
panic(fmt.Sprintf("no cluster node became active in timeout window: %v", err))
}
return vpc.cluster.ClusterNodes[node]
}
func (vpc *VaultPkiCluster) GetNonActiveNodes() []*docker.DockerClusterNode {
nodes := []*docker.DockerClusterNode{}
for _, node := range vpc.cluster.ClusterNodes {
leader, err := node.APIClient().Sys().Leader()
if err != nil {
continue
}
if !leader.IsSelf {
nodes = append(nodes, node)
}
}
return nodes
}
func (vpc *VaultPkiCluster) GetActiveContainerHostPort() string {
return vpc.cluster.ClusterNodes[0].HostPort
return vpc.GetActiveClusterNode().HostPort
}
func (vpc *VaultPkiCluster) GetContainerNetworkName() string {
@@ -69,15 +98,15 @@ func (vpc *VaultPkiCluster) GetContainerNetworkName() string {
}
func (vpc *VaultPkiCluster) GetActiveContainerIP() string {
return vpc.cluster.ClusterNodes[0].ContainerIPAddress
return vpc.GetActiveClusterNode().ContainerIPAddress
}
func (vpc *VaultPkiCluster) GetActiveContainerID() string {
return vpc.cluster.ClusterNodes[0].Container.ID
return vpc.GetActiveClusterNode().Container.ID
}
func (vpc *VaultPkiCluster) GetActiveNode() *api.Client {
return vpc.cluster.Nodes()[0].APIClient()
return vpc.GetActiveClusterNode().APIClient()
}
func (vpc *VaultPkiCluster) AddHostname(hostname, ip string) error {

3
changelog/21115.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
secrets/pki: Process pending ACME accepted challenges upon startup
```