[VAULT-22480] Add audit fallback device (#24583)

Co-authored-by: Peter Wilson <peter.wilson@hashicorp.com>
This commit is contained in:
Kuba Wieczorek
2024-01-08 13:57:43 +00:00
committed by GitHub
parent 6e537bb376
commit 2047ce7527
12 changed files with 888 additions and 19 deletions

View File

@@ -279,6 +279,11 @@ type Backend interface {
// nodes for node and pipeline registration.
event.PipelineReader
// IsFallback can be used to determine if this audit backend device is intended to
// be used as a fallback to catch all events that are not written when only using
// filtered pipelines.
IsFallback() bool
// LogRequest is used to synchronously log a request. This is done after the
// request is authorized but before the request is executed. The arguments
// MUST not be modified in any way. They should be deep copied if this is

View File

@@ -16,6 +16,7 @@ import (
"sync/atomic"
"github.com/hashicorp/eventlogger"
"github.com/hashicorp/go-secure-stdlib/parseutil"
"github.com/hashicorp/vault/audit"
"github.com/hashicorp/vault/internal/observability/event"
"github.com/hashicorp/vault/sdk/helper/salt"
@@ -36,6 +37,7 @@ var _ audit.Backend = (*Backend)(nil)
// or reset the write cursor, this should be done in the future.
type Backend struct {
f *os.File
fallback bool
fileLock sync.RWMutex
formatter *audit.EntryFormatterWriter
formatConfig audit.FormatterConfig
@@ -60,6 +62,21 @@ func Factory(_ context.Context, conf *audit.BackendConfig, useEventLogger bool,
return nil, fmt.Errorf("%s: nil salt view", op)
}
// The config options 'fallback' and 'filter' are mutually exclusive, a fallback
// device catches everything, so it cannot be allowed to filter.
var fallback bool
var err error
if fallbackRaw, ok := conf.Config["fallback"]; ok {
fallback, err = parseutil.ParseBool(fallbackRaw)
if err != nil {
return nil, fmt.Errorf("%s: unable to parse 'fallback': %w", op, err)
}
}
if _, ok := conf.Config["filter"]; ok && fallback {
return nil, fmt.Errorf("%s: cannot configure a fallback device with a filter: %w", op, event.ErrInvalidParameter)
}
// Get file path from config or fall back to the old option name ('path') for compatibility
// (see commit bac4fe0799a372ba1245db642f3f6cd1f1d02669).
var filePath string
@@ -106,6 +123,7 @@ func Factory(_ context.Context, conf *audit.BackendConfig, useEventLogger bool,
}
b := &Backend{
fallback: fallback,
filePath: filePath,
formatConfig: cfg,
mode: mode,
@@ -550,3 +568,10 @@ func (b *Backend) EventType() eventlogger.EventType {
func (b *Backend) HasFiltering() bool {
return len(b.nodeIDList) > 0 && b.nodeMap[b.nodeIDList[0]].Type() == eventlogger.NodeTypeFilter
}
// IsFallback can be used to determine if this audit backend device is intended to
// be used as a fallback to catch all events that are not written when only using
// filtered pipelines.
func (b *Backend) IsFallback() bool {
return b.fallback
}

View File

@@ -576,3 +576,129 @@ func TestBackend_configureFilterFormatterSink(t *testing.T) {
node = b.nodeMap[id]
require.Equal(t, eventlogger.NodeTypeSink, node.Type())
}
// TestBackend_Factory_Conf is used to ensure that any configuration which is
// supplied, is validated and tested.
func TestBackend_Factory_Conf(t *testing.T) {
t.Parallel()
ctx := context.Background()
tests := map[string]struct {
backendConfig *audit.BackendConfig
isErrorExpected bool
expectedErrorMessage string
}{
"nil-salt-config": {
backendConfig: &audit.BackendConfig{
SaltConfig: nil,
},
isErrorExpected: true,
expectedErrorMessage: "file.Factory: nil salt config",
},
"nil-salt-view": {
backendConfig: &audit.BackendConfig{
SaltConfig: &salt.Config{},
},
isErrorExpected: true,
expectedErrorMessage: "file.Factory: nil salt view",
},
"fallback-device-with-filter": {
backendConfig: &audit.BackendConfig{
MountPath: "discard",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"fallback": "true",
"file_path": discard,
"filter": "mount_type == kv",
},
},
isErrorExpected: true,
expectedErrorMessage: "file.Factory: cannot configure a fallback device with a filter: invalid parameter",
},
"non-fallback-device-with-filter": {
backendConfig: &audit.BackendConfig{
MountPath: "discard",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"fallback": "false",
"file_path": discard,
"filter": "mount_type == kv",
},
},
isErrorExpected: false,
},
}
for name, tc := range tests {
name := name
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
be, err := Factory(ctx, tc.backendConfig, true, nil)
switch {
case tc.isErrorExpected:
require.Error(t, err)
require.EqualError(t, err, tc.expectedErrorMessage)
default:
require.NoError(t, err)
require.NotNil(t, be)
}
})
}
}
// TestBackend_IsFallback ensures that the 'fallback' config setting is parsed
// and set correctly, then exposed via the interface method IsFallback().
func TestBackend_IsFallback(t *testing.T) {
t.Parallel()
ctx := context.Background()
tests := map[string]struct {
backendConfig *audit.BackendConfig
isFallbackExpected bool
}{
"fallback": {
backendConfig: &audit.BackendConfig{
MountPath: "discard",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"fallback": "true",
"file_path": discard,
},
},
isFallbackExpected: true,
},
"no-fallback": {
backendConfig: &audit.BackendConfig{
MountPath: "discard",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"fallback": "false",
"file_path": discard,
},
},
isFallbackExpected: false,
},
}
for name, tc := range tests {
name := name
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
be, err := Factory(ctx, tc.backendConfig, true, nil)
require.NoError(t, err)
require.NotNil(t, be)
require.Equal(t, tc.isFallbackExpected, be.IsFallback())
})
}
}

View File

@@ -29,6 +29,7 @@ type Backend struct {
sync.Mutex
address string
connection net.Conn
fallback bool
formatter *audit.EntryFormatterWriter
formatConfig audit.FormatterConfig
name string
@@ -73,12 +74,27 @@ func Factory(_ context.Context, conf *audit.BackendConfig, useEventLogger bool,
return nil, fmt.Errorf("%s: failed to parse 'write_timeout': %w", op, err)
}
// The config options 'fallback' and 'filter' are mutually exclusive, a fallback
// device catches everything, so it cannot be allowed to filter.
var fallback bool
if fallbackRaw, ok := conf.Config["fallback"]; ok {
fallback, err = parseutil.ParseBool(fallbackRaw)
if err != nil {
return nil, fmt.Errorf("%s: unable to parse 'fallback': %w", op, err)
}
}
if _, ok := conf.Config["filter"]; ok && fallback {
return nil, fmt.Errorf("%s: cannot configure a fallback device with a filter: %w", op, event.ErrInvalidParameter)
}
cfg, err := formatterConfig(conf.Config)
if err != nil {
return nil, fmt.Errorf("%s: failed to create formatter config: %w", op, err)
}
b := &Backend{
fallback: fallback,
address: address,
formatConfig: cfg,
name: conf.MountPath,
@@ -443,3 +459,10 @@ func (b *Backend) EventType() eventlogger.EventType {
func (b *Backend) HasFiltering() bool {
return len(b.nodeIDList) > 0 && b.nodeMap[b.nodeIDList[0]].Type() == eventlogger.NodeTypeFilter
}
// IsFallback can be used to determine if this audit backend device is intended to
// be used as a fallback to catch all events that are not written when only using
// filtered pipelines.
func (b *Backend) IsFallback() bool {
return b.fallback
}

View File

@@ -4,10 +4,13 @@
package socket
import (
"context"
"testing"
"github.com/hashicorp/eventlogger"
"github.com/hashicorp/vault/audit"
"github.com/hashicorp/vault/sdk/helper/salt"
"github.com/hashicorp/vault/sdk/logical"
"github.com/stretchr/testify/require"
)
@@ -329,3 +332,192 @@ func TestBackend_configureFilterFormatterSink(t *testing.T) {
node = b.nodeMap[id]
require.Equal(t, eventlogger.NodeTypeSink, node.Type())
}
// TestBackend_Factory_Conf is used to ensure that any configuration which is
// supplied, is validated and tested.
func TestBackend_Factory_Conf(t *testing.T) {
t.Parallel()
ctx := context.Background()
tests := map[string]struct {
backendConfig *audit.BackendConfig
isErrorExpected bool
expectedErrorMessage string
}{
"nil-salt-config": {
backendConfig: &audit.BackendConfig{
SaltConfig: nil,
},
isErrorExpected: true,
expectedErrorMessage: "socket.Factory: nil salt config",
},
"nil-salt-view": {
backendConfig: &audit.BackendConfig{
SaltConfig: &salt.Config{},
},
isErrorExpected: true,
expectedErrorMessage: "socket.Factory: nil salt view",
},
"no-address": {
backendConfig: &audit.BackendConfig{
MountPath: "discard",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{},
},
isErrorExpected: true,
expectedErrorMessage: "socket.Factory: address is required",
},
"empty-address": {
backendConfig: &audit.BackendConfig{
MountPath: "discard",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"address": "",
},
},
isErrorExpected: true,
expectedErrorMessage: "socket.Factory: error configuring sink node: socket.(Backend).configureSinkNode: address is required: invalid parameter",
},
"whitespace-address": {
backendConfig: &audit.BackendConfig{
MountPath: "discard",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"address": " ",
},
},
isErrorExpected: true,
expectedErrorMessage: "socket.Factory: error configuring sink node: socket.(Backend).configureSinkNode: address is required: invalid parameter",
},
"write-duration-valid": {
backendConfig: &audit.BackendConfig{
MountPath: "discard",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"address": "hashicorp.com",
"write_timeout": "5s",
},
},
isErrorExpected: false,
},
"write-duration-not-valid": {
backendConfig: &audit.BackendConfig{
MountPath: "discard",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"address": "hashicorp.com",
"write_timeout": "qwerty",
},
},
isErrorExpected: true,
expectedErrorMessage: "socket.Factory: failed to parse 'write_timeout': time: invalid duration \"qwerty\"",
},
"non-fallback-device-with-filter": {
backendConfig: &audit.BackendConfig{
MountPath: "discard",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"address": "hashicorp.com",
"write_timeout": "5s",
"fallback": "false",
"filter": "mount_type == kv",
},
},
isErrorExpected: false,
},
"fallback-device-with-filter": {
backendConfig: &audit.BackendConfig{
MountPath: "discard",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"address": "hashicorp.com",
"write_timeout": "2s",
"fallback": "true",
"filter": "mount_type == kv",
},
},
isErrorExpected: true,
expectedErrorMessage: "socket.Factory: cannot configure a fallback device with a filter: invalid parameter",
},
}
for name, tc := range tests {
name := name
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
be, err := Factory(ctx, tc.backendConfig, true, nil)
switch {
case tc.isErrorExpected:
require.Error(t, err)
require.EqualError(t, err, tc.expectedErrorMessage)
default:
require.NoError(t, err)
require.NotNil(t, be)
}
})
}
}
// TestBackend_IsFallback ensures that the 'fallback' config setting is parsed
// and set correctly, then exposed via the interface method IsFallback().
func TestBackend_IsFallback(t *testing.T) {
t.Parallel()
ctx := context.Background()
tests := map[string]struct {
backendConfig *audit.BackendConfig
isFallbackExpected bool
}{
"fallback": {
backendConfig: &audit.BackendConfig{
MountPath: "qwerty",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"fallback": "true",
"address": "hashicorp.com",
"write_timeout": "5s",
},
},
isFallbackExpected: true,
},
"no-fallback": {
backendConfig: &audit.BackendConfig{
MountPath: "qwerty",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"fallback": "false",
"address": "hashicorp.com",
"write_timeout": "5s",
},
},
isFallbackExpected: false,
},
}
for name, tc := range tests {
name := name
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
be, err := Factory(ctx, tc.backendConfig, true, nil)
require.NoError(t, err)
require.NotNil(t, be)
require.Equal(t, tc.isFallbackExpected, be.IsFallback())
})
}
}

View File

@@ -12,6 +12,7 @@ import (
"sync"
"github.com/hashicorp/eventlogger"
"github.com/hashicorp/go-secure-stdlib/parseutil"
gsyslog "github.com/hashicorp/go-syslog"
"github.com/hashicorp/vault/audit"
"github.com/hashicorp/vault/internal/observability/event"
@@ -23,6 +24,7 @@ var _ audit.Backend = (*Backend)(nil)
// Backend is the audit backend for the syslog-based audit store.
type Backend struct {
fallback bool
formatter *audit.EntryFormatterWriter
formatConfig audit.FormatterConfig
logger gsyslog.Syslogger
@@ -58,6 +60,21 @@ func Factory(_ context.Context, conf *audit.BackendConfig, useEventLogger bool,
tag = "vault"
}
// The config options 'fallback' and 'filter' are mutually exclusive, a fallback
// device catches everything, so it cannot be allowed to filter.
var fallback bool
var err error
if fallbackRaw, ok := conf.Config["fallback"]; ok {
fallback, err = parseutil.ParseBool(fallbackRaw)
if err != nil {
return nil, fmt.Errorf("%s: unable to parse 'fallback': %w", op, err)
}
}
if _, ok := conf.Config["filter"]; ok && fallback {
return nil, fmt.Errorf("%s: cannot configure a fallback device with a filter: %w", op, event.ErrInvalidParameter)
}
cfg, err := formatterConfig(conf.Config)
if err != nil {
return nil, fmt.Errorf("%s: failed to create formatter config: %w", op, err)
@@ -70,6 +87,7 @@ func Factory(_ context.Context, conf *audit.BackendConfig, useEventLogger bool,
}
b := &Backend{
fallback: fallback,
formatConfig: cfg,
logger: logger,
name: conf.MountPath,
@@ -347,3 +365,10 @@ func (b *Backend) EventType() eventlogger.EventType {
func (b *Backend) HasFiltering() bool {
return len(b.nodeIDList) > 0 && b.nodeMap[b.nodeIDList[0]].Type() == eventlogger.NodeTypeFilter
}
// IsFallback can be used to determine if this audit backend device is intended to
// be used as a fallback to catch all events that are not written when only using
// filtered pipelines.
func (b *Backend) IsFallback() bool {
return b.fallback
}

View File

@@ -4,10 +4,13 @@
package syslog
import (
"context"
"testing"
"github.com/hashicorp/eventlogger"
"github.com/hashicorp/vault/audit"
"github.com/hashicorp/vault/sdk/helper/salt"
"github.com/hashicorp/vault/sdk/logical"
"github.com/stretchr/testify/require"
)
@@ -311,3 +314,125 @@ func TestBackend_configureFilterFormatterSink(t *testing.T) {
node = b.nodeMap[id]
require.Equal(t, eventlogger.NodeTypeSink, node.Type())
}
// TestBackend_Factory_Conf is used to ensure that any configuration which is
// supplied, is validated and tested.
func TestBackend_Factory_Conf(t *testing.T) {
t.Parallel()
ctx := context.Background()
tests := map[string]struct {
backendConfig *audit.BackendConfig
isErrorExpected bool
expectedErrorMessage string
}{
"nil-salt-config": {
backendConfig: &audit.BackendConfig{
SaltConfig: nil,
},
isErrorExpected: true,
expectedErrorMessage: "syslog.Factory: nil salt config",
},
"nil-salt-view": {
backendConfig: &audit.BackendConfig{
SaltConfig: &salt.Config{},
},
isErrorExpected: true,
expectedErrorMessage: "syslog.Factory: nil salt view",
},
"non-fallback-device-with-filter": {
backendConfig: &audit.BackendConfig{
MountPath: "discard",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"fallback": "false",
"filter": "mount_type == kv",
},
},
isErrorExpected: false,
},
"fallback-device-with-filter": {
backendConfig: &audit.BackendConfig{
MountPath: "discard",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"fallback": "true",
"filter": "mount_type == kv",
},
},
isErrorExpected: true,
expectedErrorMessage: "syslog.Factory: cannot configure a fallback device with a filter: invalid parameter",
},
}
for name, tc := range tests {
name := name
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
be, err := Factory(ctx, tc.backendConfig, true, nil)
switch {
case tc.isErrorExpected:
require.Error(t, err)
require.EqualError(t, err, tc.expectedErrorMessage)
default:
require.NoError(t, err)
require.NotNil(t, be)
}
})
}
}
// TestBackend_IsFallback ensures that the 'fallback' config setting is parsed
// and set correctly, then exposed via the interface method IsFallback().
func TestBackend_IsFallback(t *testing.T) {
t.Parallel()
ctx := context.Background()
tests := map[string]struct {
backendConfig *audit.BackendConfig
isFallbackExpected bool
}{
"fallback": {
backendConfig: &audit.BackendConfig{
MountPath: "qwerty",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"fallback": "true",
},
},
isFallbackExpected: true,
},
"no-fallback": {
backendConfig: &audit.BackendConfig{
MountPath: "qwerty",
SaltConfig: &salt.Config{},
SaltView: &logical.InmemStorage{},
Config: map[string]string{
"fallback": "false",
},
},
isFallbackExpected: false,
},
}
for name, tc := range tests {
name := name
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
be, err := Factory(ctx, tc.backendConfig, true, nil)
require.NoError(t, err)
require.NotNil(t, be)
require.Equal(t, tc.isFallbackExpected, be.IsFallback())
})
}
}

View File

@@ -535,7 +535,7 @@ func (n *NoopAudit) Invalidate(_ context.Context) {
// the audit.Backend interface.
func (n *NoopAudit) RegisterNodesAndPipeline(broker *eventlogger.Broker, name string) error {
for id, node := range n.nodeMap {
if err := broker.RegisterNode(id, node, eventlogger.WithNodeRegistrationPolicy(eventlogger.DenyOverwrite)); err != nil {
if err := broker.RegisterNode(id, node); err != nil {
return err
}
}
@@ -546,7 +546,7 @@ func (n *NoopAudit) RegisterNodesAndPipeline(broker *eventlogger.Broker, name st
NodeIDs: n.nodeIDList,
}
return broker.RegisterPipeline(pipeline, eventlogger.WithPipelineRegistrationPolicy(eventlogger.DenyOverwrite))
return broker.RegisterPipeline(pipeline)
}
type TestLogger struct {
@@ -632,3 +632,7 @@ func (n *NoopAudit) Nodes() map[eventlogger.NodeID]eventlogger.Node {
func (n *NoopAudit) NodeIDs() []eventlogger.NodeID {
return n.nodeIDList
}
func (n *NoopAudit) IsFallback() bool {
return false
}

View File

@@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"os"
"strconv"
"strings"
"time"
@@ -81,6 +82,17 @@ func (c *Core) enableAudit(ctx context.Context, entry *MountEntry, updateStorage
return fmt.Errorf("backend path must be specified")
}
if fallbackRaw, ok := entry.Options["fallback"]; ok {
fallback, err := parseutil.ParseBool(fallbackRaw)
if err != nil {
return fmt.Errorf("unable to enable audit device '%s', cannot parse supplied 'fallback' setting: %w", entry.Path, err)
}
// Reassigning the fallback value means we can ensure that the formatting
// of it as a string is consistent for future comparisons.
entry.Options["fallback"] = strconv.FormatBool(fallback)
}
// Update the audit table
c.auditLock.Lock()
defer c.auditLock.Unlock()
@@ -88,6 +100,8 @@ func (c *Core) enableAudit(ctx context.Context, entry *MountEntry, updateStorage
// Look for matching name
for _, ent := range c.audit.Entries {
switch {
case entry.Options["fallback"] == "true" && ent.Options["fallback"] == "true":
return fmt.Errorf("unable to enable audit device '%s', a fallback device already exists '%s'", entry.Path, ent.Path)
// Existing is sql/mysql/ new is sql/ or
// existing is sql/ and new is sql/mysql/
case strings.HasPrefix(ent.Path, entry.Path):
@@ -531,7 +545,7 @@ func (c *Core) newAuditBackend(ctx context.Context, entry *MountEntry, view logi
!disableEventLogger,
c.auditedHeaders)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to create new audit backend: %w", err)
}
if be == nil {
return nil, fmt.Errorf("nil backend returned from %q factory function", entry.Type)

View File

@@ -12,9 +12,9 @@ import (
"sync"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/eventlogger"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-metrics"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/vault/audit"
"github.com/hashicorp/vault/helper/namespace"
@@ -32,32 +32,52 @@ type backendEntry struct {
type AuditBroker struct {
sync.RWMutex
backends map[string]backendEntry
logger hclog.Logger
// broker is used to register pipelines for all devices except a fallback device.
broker *eventlogger.Broker
// fallbackBroker is used to register a pipeline to be used as a fallback
// in situations where we cannot use the eventlogger.Broker to guarantee that
// the required number of sinks were successfully written to. This situation
// occurs when all the audit devices registered with the broker use filtering.
// NOTE: there should only ever be a single device registered on the fallbackBroker.
fallbackBroker *eventlogger.Broker
// fallbackName stores the name (path) of the audit device which has been configured
// as the fallback pipeline (its eventlogger.PipelineID).
fallbackName string
logger hclog.Logger
}
// NewAuditBroker creates a new audit broker
func NewAuditBroker(log hclog.Logger, useEventLogger bool) (*AuditBroker, error) {
var eventBroker *eventlogger.Broker
var fallbackBroker *eventlogger.Broker
var err error
// The reason for this check is due to 1.15.x supporting the env var:
// 'VAULT_AUDIT_DISABLE_EVENTLOGGER'
// When NewAuditBroker is called, it is supplied a bool to determine whether
// we initialize the broker, which are left nil otherwise.
// we initialize the broker (and fallback broker), which are left nil otherwise.
// In 1.16.x this check should go away and the env var removed.
if useEventLogger {
eventBroker, err = eventlogger.NewBroker()
if err != nil {
return nil, fmt.Errorf("error creating event broker for audit events: %w", err)
}
// Set up the broker that will support a single fallback device.
fallbackBroker, err = eventlogger.NewBroker()
if err != nil {
return nil, fmt.Errorf("error creating event fallback broker for audit event: %w", err)
}
}
b := &AuditBroker{
backends: make(map[string]backendEntry),
logger: log,
broker: eventBroker,
backends: make(map[string]backendEntry),
logger: log,
broker: eventBroker,
fallbackBroker: fallbackBroker,
}
return b, nil
}
@@ -74,19 +94,42 @@ func (a *AuditBroker) Register(name string, b audit.Backend, local bool) error {
return fmt.Errorf("%s: name is required: %w", op, event.ErrInvalidParameter)
}
// If the backend is already registered, we cannot re-register it.
if a.isRegistered(name) {
return fmt.Errorf("%s: backend already registered '%s'", op, name)
}
// Fallback devices are singleton instances, we cannot register more than one or overwrite the existing one.
if b.IsFallback() && a.fallbackBroker.IsAnyPipelineRegistered(eventlogger.EventType(event.AuditType.String())) {
existing, err := a.existingFallbackName()
if err != nil {
return fmt.Errorf("%s: existing fallback device already registered: %w", op, err)
}
return fmt.Errorf("%s: existing fallback device already registered: %q", op, existing)
}
// The reason for this check is due to 1.15.x supporting the env var:
// 'VAULT_AUDIT_DISABLE_EVENTLOGGER'
// When NewAuditBroker is called, it is supplied a bool to determine whether
// we initialize the broker, which are left nil otherwise.
// we initialize the broker (and fallback broker), which are left nil otherwise.
// In 1.16.x this check should go away and the env var removed.
if a.broker != nil {
if name != b.Name() {
return fmt.Errorf("%s: audit registration failed due to device name mismatch: %q, %q", op, name, b.Name())
}
err := a.register(name, b)
if err != nil {
return fmt.Errorf("%s: unable to register device for %q: %w", op, name, err)
switch {
case b.IsFallback():
err := a.registerFallback(name, b)
if err != nil {
return fmt.Errorf("%s: unable to register fallback device for %q: %w", op, name, err)
}
default:
err := a.register(name, b)
if err != nil {
return fmt.Errorf("%s: unable to register device for %q: %w", op, name, err)
}
}
}
@@ -110,6 +153,12 @@ func (a *AuditBroker) Deregister(ctx context.Context, name string) error {
return fmt.Errorf("%s: name is required: %w", op, event.ErrInvalidParameter)
}
// If the backend isn't actually registered, then there's nothing to do.
// We don't return any error so that Deregister can be idempotent.
if !a.isRegistered(name) {
return nil
}
// Remove the Backend from the map first, so that if an error occurs while
// removing the pipeline and nodes, we can quickly exit this method with
// the error.
@@ -118,23 +167,37 @@ func (a *AuditBroker) Deregister(ctx context.Context, name string) error {
// The reason for this check is due to 1.15.x supporting the env var:
// 'VAULT_AUDIT_DISABLE_EVENTLOGGER'
// When NewAuditBroker is called, it is supplied a bool to determine whether
// we initialize the broker, which are left nil otherwise.
// we initialize the broker (and fallback broker), which are left nil otherwise.
// In 1.16.x this check should go away and the env var removed.
if a.broker != nil {
err := a.deregister(ctx, name)
if err != nil {
return fmt.Errorf("%s: deregistration failed for audit device %q: %w", op, name, err)
switch {
case name == a.fallbackName:
err := a.deregisterFallback(ctx, name)
if err != nil {
return fmt.Errorf("%s: deregistration failed for fallback audit device %q: %w", op, name, err)
}
default:
err := a.deregister(ctx, name)
if err != nil {
return fmt.Errorf("%s: deregistration failed for audit device %q: %w", op, name, err)
}
}
}
return nil
}
// IsRegistered is used to check if a given audit backend is registered
// IsRegistered is used to check if a given audit backend is registered.
func (a *AuditBroker) IsRegistered(name string) bool {
a.RLock()
defer a.RUnlock()
return a.isRegistered(name)
}
// isRegistered is used to check if a given audit backend is registered.
// This method should be used within the AuditBroker to prevent locking issues.
func (a *AuditBroker) isRegistered(name string) bool {
_, ok := a.backends[name]
return ok
}
@@ -236,6 +299,9 @@ func (a *AuditBroker) LogRequest(ctx context.Context, in *logical.LogInput, head
e.Data = in
// There may be cases where only the fallback device was added but no other
// normal audit devices, so check if the broker had an audit based pipeline
// registered before trying to send to it.
var status eventlogger.Status
if a.broker.IsAnyPipelineRegistered(eventlogger.EventType(event.AuditType.String())) {
status, err = a.broker.Send(ctx, eventlogger.EventType(event.AuditType.String()), e)
@@ -255,6 +321,15 @@ func (a *AuditBroker) LogRequest(ctx context.Context, in *logical.LogInput, head
retErr = multierror.Append(retErr, multierror.Append(errors.New("error during audit pipeline processing"), status.Warnings...))
return retErr.ErrorOrNil()
}
// If a fallback device is registered we can rely on that to 'catch all'
// and also the broker level guarantee for completed sinks.
if a.fallbackBroker.IsAnyPipelineRegistered(eventlogger.EventType(event.AuditType.String())) {
status, err = a.fallbackBroker.Send(ctx, eventlogger.EventType(event.AuditType.String()), e)
if err != nil {
retErr = multierror.Append(retErr, multierror.Append(fmt.Errorf("auditing request to fallback device failed: %w", err), status.Warnings...))
}
}
}
}
@@ -349,6 +424,9 @@ func (a *AuditBroker) LogResponse(ctx context.Context, in *logical.LogInput, hea
defer auditCancel()
auditContext = namespace.ContextWithNamespace(auditContext, ns)
// There may be cases where only the fallback device was added but no other
// normal audit devices, so check if the broker had an audit based pipeline
// registered before trying to send to it.
var status eventlogger.Status
if a.broker.IsAnyPipelineRegistered(eventlogger.EventType(event.AuditType.String())) {
status, err = a.broker.Send(auditContext, eventlogger.EventType(event.AuditType.String()), e)
@@ -368,6 +446,15 @@ func (a *AuditBroker) LogResponse(ctx context.Context, in *logical.LogInput, hea
retErr = multierror.Append(retErr, multierror.Append(errors.New("error during audit pipeline processing"), status.Warnings...))
return retErr.ErrorOrNil()
}
// If a fallback device is registered we can rely on that to 'catch all'
// and also the broker level guarantee for completed sinks.
if a.fallbackBroker.IsAnyPipelineRegistered(eventlogger.EventType(event.AuditType.String())) {
status, err = a.fallbackBroker.Send(auditContext, eventlogger.EventType(event.AuditType.String()), e)
if err != nil {
retErr = multierror.Append(retErr, multierror.Append(fmt.Errorf("auditing response to fallback device failed: %w", err), status.Warnings...))
}
}
}
}
@@ -391,13 +478,19 @@ func (a *AuditBroker) Invalidate(ctx context.Context, key string) {
// guarantee provided by setting the threshold to 1, and must set it to 0.
// If you are registering an audit device, you should first check if that backend
// does not have filtering before querying the backends via requiredSuccessThresholdSinks.
// backends may also contain a fallback device, which should be ignored as it is
// handled by the fallbackBroker.
func (a *AuditBroker) requiredSuccessThresholdSinks() int {
threshold := 0
// We might need to check over all the existing backends to discover if any
// don't use filtering.
for _, be := range a.backends {
if !be.backend.HasFiltering() {
switch {
case be.backend.IsFallback():
// Ignore fallback devices as they're handled by a separate broker.
continue
case !be.backend.HasFiltering():
threshold = 1
break
}
@@ -432,6 +525,65 @@ func registerNodesAndPipeline(broker *eventlogger.Broker, b audit.Backend) error
return nil
}
// existingFallbackName returns the name of the fallback device which is registered
// with the AuditBroker.
func (a *AuditBroker) existingFallbackName() (string, error) {
const op = "vault.(AuditBroker).existingFallbackName"
for _, be := range a.backends {
if be.backend.IsFallback() {
return be.backend.Name(), nil
}
}
return "", fmt.Errorf("%s: existing fallback device name is missing", op)
}
// registerFallback can be used to register a fallback device, it will also
// configure the success threshold required for sinks.
func (a *AuditBroker) registerFallback(name string, backend audit.Backend) error {
const op = "vault.(AuditBroker).registerFallback"
err := registerNodesAndPipeline(a.fallbackBroker, backend)
if err != nil {
return fmt.Errorf("%s: fallback device pipeline registration error: %w", op, err)
}
// Store the name of the fallback audit device so that we can check when
// deregistering if the device is the single fallback one.
a.fallbackName = backend.Name()
// We need to turn on the threshold for the fallback broker, so we can
// guarantee it ends up somewhere
err = a.fallbackBroker.SetSuccessThresholdSinks(eventlogger.EventType(event.AuditType.String()), 1)
if err != nil {
return fmt.Errorf("%s: unable to configure fallback sink success threshold (1) for %q: %w", op, name, err)
}
return nil
}
// deregisterFallback can be used to deregister a fallback audit device, it will
// also configure the success threshold required for sinks.
func (a *AuditBroker) deregisterFallback(ctx context.Context, name string) error {
const op = "vault.(AuditBroker).deregisterFallback"
err := a.fallbackBroker.SetSuccessThresholdSinks(eventlogger.EventType(event.AuditType.String()), 0)
if err != nil {
return fmt.Errorf("%s: unable to configure fallback sink success threshold (0) for %q: %w", op, name, err)
}
_, err = a.fallbackBroker.RemovePipelineAndNodes(ctx, eventlogger.EventType(event.AuditType.String()), eventlogger.PipelineID(name))
if err != nil {
return fmt.Errorf("%s: unable to deregister fallback device %q: %w", op, name, err)
}
// Clear the fallback device name now we've deregistered.
a.fallbackName = ""
return nil
}
// register can be used to register a normal audit device, it will also calculate
// and configure the success threshold required for sinks.
func (a *AuditBroker) register(name string, backend audit.Backend) error {

View File

@@ -141,3 +141,118 @@ func TestAuditBroker_Deregister_SuccessThresholdSinks(t *testing.T) {
require.True(t, ok)
require.Equal(t, 1, res)
}
// TestAuditBroker_Register_Fallback ensures we can register a fallback device.
func TestAuditBroker_Register_Fallback(t *testing.T) {
t.Parallel()
l := corehelpers.NewTestLogger(t)
a, err := NewAuditBroker(l, true)
require.NoError(t, err)
require.NotNil(t, a)
path := "juan/"
fallbackBackend := testAuditBackend(t, path, map[string]string{"fallback": "true"})
err = a.Register(path, fallbackBackend, false)
require.NoError(t, err)
require.True(t, a.fallbackBroker.IsAnyPipelineRegistered(eventlogger.EventType(event.AuditType.String())))
require.Equal(t, path, a.fallbackName)
threshold, found := a.fallbackBroker.SuccessThresholdSinks(eventlogger.EventType(event.AuditType.String()))
require.True(t, found)
require.Equal(t, 1, threshold)
}
// TestAuditBroker_Register_FallbackMultiple tests that trying to register more
// than a single fallback device results in the correct error.
func TestAuditBroker_Register_FallbackMultiple(t *testing.T) {
t.Parallel()
l := corehelpers.NewTestLogger(t)
a, err := NewAuditBroker(l, true)
require.NoError(t, err)
require.NotNil(t, a)
path1 := "juan1/"
fallbackBackend1 := testAuditBackend(t, path1, map[string]string{"fallback": "true"})
err = a.Register(path1, fallbackBackend1, false)
require.NoError(t, err)
require.True(t, a.fallbackBroker.IsAnyPipelineRegistered(eventlogger.EventType(event.AuditType.String())))
require.Equal(t, path1, a.fallbackName)
path2 := "juan2/"
fallbackBackend2 := testAuditBackend(t, path2, map[string]string{"fallback": "true"})
err = a.Register(path1, fallbackBackend2, false)
require.Error(t, err)
require.EqualError(t, err, "vault.(AuditBroker).Register: backend already registered 'juan1/'")
require.True(t, a.fallbackBroker.IsAnyPipelineRegistered(eventlogger.EventType(event.AuditType.String())))
require.Equal(t, path1, a.fallbackName)
}
// TestAuditBroker_Deregister_Fallback ensures that we can deregister a fallback
// device successfully.
func TestAuditBroker_Deregister_Fallback(t *testing.T) {
t.Parallel()
l := corehelpers.NewTestLogger(t)
a, err := NewAuditBroker(l, true)
require.NoError(t, err)
require.NotNil(t, a)
path := "juan/"
fallbackBackend := testAuditBackend(t, path, map[string]string{"fallback": "true"})
err = a.Register(path, fallbackBackend, false)
require.NoError(t, err)
require.True(t, a.fallbackBroker.IsAnyPipelineRegistered(eventlogger.EventType(event.AuditType.String())))
require.Equal(t, path, a.fallbackName)
threshold, found := a.fallbackBroker.SuccessThresholdSinks(eventlogger.EventType(event.AuditType.String()))
require.True(t, found)
require.Equal(t, 1, threshold)
err = a.Deregister(context.Background(), path)
require.NoError(t, err)
require.False(t, a.fallbackBroker.IsAnyPipelineRegistered(eventlogger.EventType(event.AuditType.String())))
require.Equal(t, "", a.fallbackName)
threshold, found = a.fallbackBroker.SuccessThresholdSinks(eventlogger.EventType(event.AuditType.String()))
require.True(t, found)
require.Equal(t, 0, threshold)
}
// TestAuditBroker_Deregister_Multiple ensures that we can call deregister multiple
// times without issue if is no matching backend registered.
func TestAuditBroker_Deregister_Multiple(t *testing.T) {
t.Parallel()
l := corehelpers.NewTestLogger(t)
a, err := NewAuditBroker(l, true)
require.NoError(t, err)
require.NotNil(t, a)
err = a.Deregister(context.Background(), "foo")
require.NoError(t, err)
err = a.Deregister(context.Background(), "foo2")
require.NoError(t, err)
}
// TestAuditBroker_Register_MultipleFails checks for failure when we try to
// re-register an audit backend.
func TestAuditBroker_Register_MultipleFails(t *testing.T) {
t.Parallel()
l := corehelpers.NewTestLogger(t)
a, err := NewAuditBroker(l, true)
require.NoError(t, err)
require.NotNil(t, a)
path := "b2-no-filter"
noFilterBackend := testAuditBackend(t, path, map[string]string{})
err = a.Register(path, noFilterBackend, false)
require.NoError(t, err)
err = a.Register(path, noFilterBackend, false)
require.Error(t, err)
require.EqualError(t, err, "vault.(AuditBroker).Register: backend already registered 'b2-no-filter'")
}

View File

@@ -237,6 +237,69 @@ func TestCore_EnableAudit_Local(t *testing.T) {
}
}
// TestAudit_enableAudit_fallback_invalid ensures that supplying a bad value for
// 'fallback' in options gives us the correct error.
func TestAudit_enableAudit_fallback_invalid(t *testing.T) {
entry := &MountEntry{
Path: "noop/",
Options: map[string]string{
"fallback": "juan",
},
}
cluster := NewTestCluster(t, nil, nil)
cluster.Start()
defer cluster.Cleanup()
core := cluster.Cores[0]
core.auditBackends["noop"] = corehelpers.NoopAuditFactory(nil)
err := core.enableAudit(context.Background(), entry, false)
require.Error(t, err)
require.EqualError(t, err, "unable to enable audit device 'noop/', cannot parse supplied 'fallback' setting: cannot parse '' as bool: strconv.ParseBool: parsing \"juan\": invalid syntax")
}
// TestAudit_enableAudit_fallback_two ensures trying to enable a second fallback
// device returns the correct error.
func TestAudit_enableAudit_fallback_two(t *testing.T) {
entry1 := &MountEntry{
Table: auditTableType,
Path: "noop1/",
Type: "noop",
UUID: "abcd",
Accessor: "noop1-abcd",
NamespaceID: namespace.RootNamespaceID,
Options: map[string]string{
"fallback": "TRUE",
},
namespace: namespace.RootNamespace,
}
entry2 := &MountEntry{
Table: auditTableType,
Path: "noop2/",
Type: "noop",
UUID: "abcd",
Accessor: "noop2-abcd",
NamespaceID: namespace.RootNamespaceID,
Options: map[string]string{
"fallback": "1",
},
namespace: namespace.RootNamespace,
}
cluster := NewTestCluster(t, nil, nil)
cluster.Start()
defer cluster.Cleanup()
core := cluster.Cores[0]
core.auditBackends["noop"] = corehelpers.NoopAuditFactory(nil)
ctx := namespace.ContextWithNamespace(context.Background(), namespace.RootNamespace)
err := core.enableAudit(ctx, entry1, false)
require.NoError(t, err)
err = core.enableAudit(ctx, entry2, false)
require.Error(t, err)
require.EqualError(t, err, "unable to enable audit device 'noop2/', a fallback device already exists 'noop1/'")
}
func TestCore_DisableAudit(t *testing.T) {
c, keys, _ := TestCoreUnsealed(t)
c.auditBackends["noop"] = corehelpers.NoopAuditFactory(nil)