[Storage/DynamoDB] Let vault modify dynamodb tables (#29371)

* [Storage/DynamoDB] Let vault modify dynamodb tables

* add changelog

---------

Co-authored-by: Violet Hynes <violet.hynes@hashicorp.com>
This commit is contained in:
Michael Diggin
2025-01-21 19:27:54 +00:00
committed by GitHub
parent 4ff9bdba90
commit 5b4b606c0d
4 changed files with 320 additions and 21 deletions

View File

@@ -48,6 +48,10 @@ const (
// that is used when none is configured explicitly.
DefaultDynamoDBWriteCapacity = 5
// DefaultDynamoDBBillingMode is the default billing mode
// that is used when none is configured explicitly.
DefaultDynamoDBBillingMode = "PROVISIONED"
// DynamoDBEmptyPath is the string that is used instead of
// empty strings when stored in DynamoDB.
DynamoDBEmptyPath = " "
@@ -167,6 +171,17 @@ func NewDynamoDBBackend(conf map[string]string, logger log.Logger) (physical.Bac
writeCapacity = DefaultDynamoDBWriteCapacity
}
billingMode := os.Getenv("AWS_DYNAMODB_BILLING_MODE")
if billingMode == "" {
billingMode = conf["billing_mode"]
if billingMode == "" {
billingMode = DefaultDynamoDBBillingMode
}
}
if billingMode != "PROVISIONED" && billingMode != "PAY_PER_REQUEST" {
return nil, fmt.Errorf("invalid billing mode: %q", billingMode)
}
endpoint := os.Getenv("AWS_DYNAMODB_ENDPOINT")
if endpoint == "" {
endpoint = conf["endpoint"]
@@ -198,6 +213,12 @@ func NewDynamoDBBackend(conf map[string]string, logger log.Logger) (physical.Bac
}
}
dynamodbAllowUpdates := os.Getenv("AWS_DYNAMODB_ALLOW_UPDATES")
if dynamodbAllowUpdates == "" {
dynamodbAllowUpdates = conf["dynamodb_allow_updates"]
}
allowUpdates := dynamodbAllowUpdates != ""
credsConfig := &awsutil.CredentialsConfig{
AccessKey: conf["access_key"],
SecretKey: conf["secret_key"],
@@ -228,7 +249,7 @@ func NewDynamoDBBackend(conf map[string]string, logger log.Logger) (physical.Bac
client := dynamodb.New(awsSession)
if err := ensureTableExists(client, table, readCapacity, writeCapacity); err != nil {
if err := ensureTableExists(client, table, readCapacity, writeCapacity, billingMode, allowUpdates); err != nil {
return nil, err
}
@@ -814,21 +835,17 @@ WatchLoop:
}
// ensureTableExists creates a DynamoDB table with a given
// DynamoDB client. If the table already exists, it is not
// being reconfigured.
func ensureTableExists(client *dynamodb.DynamoDB, table string, readCapacity, writeCapacity int) error {
_, err := client.DescribeTable(&dynamodb.DescribeTableInput{
// DynamoDB client.
// If the table already exists, it is not being reconfigured unless allowUpdates is true.
func ensureTableExists(client *dynamodb.DynamoDB, table string, readCapacity, writeCapacity int, billingMode string, allowUpdates bool) error {
tableDescription, err := client.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String(table),
})
if err != nil {
if awsError, ok := err.(awserr.Error); ok {
if awsError.Code() == "ResourceNotFoundException" {
_, err := client.CreateTable(&dynamodb.CreateTableInput{
createTableRequest := &dynamodb.CreateTableInput{
TableName: aws.String(table),
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(int64(readCapacity)),
WriteCapacityUnits: aws.Int64(int64(writeCapacity)),
},
KeySchema: []*dynamodb.KeySchemaElement{{
AttributeName: aws.String("Path"),
KeyType: aws.String("HASH"),
@@ -843,7 +860,18 @@ func ensureTableExists(client *dynamodb.DynamoDB, table string, readCapacity, wr
AttributeName: aws.String("Key"),
AttributeType: aws.String("S"),
}},
})
}
if billingMode == "PAY_PER_REQUEST" {
// PAY_PER_REQUEST doesn't require setting capacity units
createTableRequest.BillingMode = aws.String(billingMode)
} else {
createTableRequest.BillingMode = aws.String(billingMode)
createTableRequest.ProvisionedThroughput = &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(int64(readCapacity)),
WriteCapacityUnits: aws.Int64(int64(writeCapacity)),
}
}
_, err := client.CreateTable(createTableRequest)
if err != nil {
return err
}
@@ -860,10 +888,50 @@ func ensureTableExists(client *dynamodb.DynamoDB, table string, readCapacity, wr
}
return err
}
if allowUpdates && shouldUpdateTable(tableDescription.Table, billingMode, readCapacity, writeCapacity) {
// this will only change the BillingMode or the read/write capacity
updateTableRequest := &dynamodb.UpdateTableInput{
TableName: aws.String(table),
}
if billingMode == "PAY_PER_REQUEST" {
// PAY_PER_REQUEST doesn't require setting capacity units
updateTableRequest.BillingMode = aws.String(billingMode)
} else {
updateTableRequest.BillingMode = aws.String(billingMode)
updateTableRequest.ProvisionedThroughput = &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(int64(readCapacity)),
WriteCapacityUnits: aws.Int64(int64(writeCapacity)),
}
}
_, err := client.UpdateTable(updateTableRequest)
if err != nil {
return err
}
}
return nil
}
// shouldUpdateTable compares the billingMode and provisioned capacity of the existing table with the
// desired billingMode and capacity
func shouldUpdateTable(tableDescription *dynamodb.TableDescription, billingMode string, readCapacity, writeCapacity int) bool {
existingBillingMode := "PROVISIONED"
// the dynamodb service returns nil when PROVISIONED is the billingMode
// as it is the default
billingSummary := tableDescription.BillingModeSummary
if billingSummary != nil {
existingBillingMode = *(billingSummary.BillingMode)
}
if existingBillingMode != billingMode {
return true
}
provisionedThroughput := tableDescription.ProvisionedThroughput
if int64(readCapacity) != *(provisionedThroughput.ReadCapacityUnits) && int64(writeCapacity) != *(provisionedThroughput.WriteCapacityUnits) {
return true
}
return false
}
// recordPathForVaultKey transforms a vault key into
// a value suitable for the `DynamoDBRecord`'s `Path`
// property. This path equals the vault key without