mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-11-01 19:17:58 +00:00
VAULT-22482: New audit metrics (#24704)
* VAULT-22481: Audit filter node (#24465) * Initial commit on adding filter nodes for audit * tests for audit filter * test: longer filter - more conditions * copywrite headers * Check interface for the right type * Add audit filtering feature (#24554) * Support filter nodes in backend factories and add some tests * More tests and cleanup * Attempt to move control of registration for nodes and pipelines to the audit broker (#24505) * invert control of the pipelines/nodes to the audit broker vs. within each backend * update noop audit test code to implement the pipeliner interface * noop mount path has trailing slash * attempting to make NoopAudit more friendly * NoopAudit uses known salt * Refactor audit.ProcessManual to support filter nodes * HasFiltering * rename the pipeliner * use exported AuditEvent in Filter * Add tests for registering and deregistering backends on the audit broker * Add missing licence header to one file, fix a typo in two tests --------- Co-authored-by: Peter Wilson <peter.wilson@hashicorp.com> * Add changelog file * initial work on global metrics for sink success/failure * initial work to add a fallback device for audit * Return when we have outright errors * Improve comment * Remove unneeded options on NewBroker and remove the policy opts elsewhere * Remove duplicate node registration code * Add more tests for audit backends * ensure we return the multierror as soon as possible, and append it correctly * error tweaks for audit: log req/resp * extract the registration for fallback/normal devices, and ensure we always add to backends when successful * slightly nicer error message rather than returning the raw err * refactor the deregister methods for audit broker * Prevent issues if fallback device is the first device added * Bail early when the user tries adding more than one fallback audit device * Check if there is an existing fallback audit device when setting the required sinks threshold for an audit broker * Use the right ParseBool in audit backends * Tweak the way we check for the threshold to make it clear why we ignore fallback * Ensure all 'fallback' settings look the same * nicer formatting of error * broker tests for Register * Deregister tests * Deregister checks if registered before attempting * Comment improvement * Multiple Deregister calls are OK * Fallback not required in this test * Sanitise input for Deregister * Locking mixup * fix test * Add changelog * Check fallback broker's sink success threshold for register/deregister * Remove changelog * updated * better name for the audit metrics labelers * extra test * remove name from metric counter type * update func calls for NewMetricsCounter * labelers should be pointers to the instance * revert audit_test complaints about the header * use constant value for the metric label on a fallback miss * remove vault prefix from metric labels * US spelling for labeler and adjust the way the labels are returned * Fixed name and type we're testing for * Defensive addition to HasFiltering (no nodemap no filter node) * Remove dupe code block * Revert to using armon/go-metrics * Fallback miss fix * PR feedback updates * consistent format for configure methods * Updated telemetry set up based on PR feedback --------- Co-authored-by: Kuba Wieczorek <kuba.wieczorek@hashicorp.com>
This commit is contained in:
@@ -54,6 +54,7 @@ func TestEntryFilter_NewEntryFilter(t *testing.T) {
|
|||||||
f, err := NewEntryFilter(tc.Filter)
|
f, err := NewEntryFilter(tc.Filter)
|
||||||
switch {
|
switch {
|
||||||
case tc.IsErrorExpected:
|
case tc.IsErrorExpected:
|
||||||
|
require.Error(t, err)
|
||||||
require.ErrorContains(t, err, tc.ExpectedErrorMessage)
|
require.ErrorContains(t, err, tc.ExpectedErrorMessage)
|
||||||
require.Nil(t, f)
|
require.Nil(t, f)
|
||||||
default:
|
default:
|
||||||
|
|||||||
57
audit/sink_metric_labeler.go
Normal file
57
audit/sink_metric_labeler.go
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
// Copyright (c) HashiCorp, Inc.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package audit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hashicorp/eventlogger"
|
||||||
|
"github.com/hashicorp/vault/internal/observability/event"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ event.Labeler = (*MetricLabelerAuditSink)(nil)
|
||||||
|
_ event.Labeler = (*MetricLabelerAuditFallback)(nil)
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
metricLabelAuditSinkSuccess = []string{"audit", "sink", "success"}
|
||||||
|
metricLabelAuditSinkFailure = []string{"audit", "sink", "failure"}
|
||||||
|
metricLabelAuditFallbackSuccess = []string{"audit", "fallback", "success"}
|
||||||
|
metricLabelAuditFallbackMiss = []string{"audit", "fallback", "miss"}
|
||||||
|
)
|
||||||
|
|
||||||
|
// MetricLabelerAuditSink can be used to provide labels for the success or failure
|
||||||
|
// of a sink node used for a normal audit device.
|
||||||
|
type MetricLabelerAuditSink struct{}
|
||||||
|
|
||||||
|
// MetricLabelerAuditFallback can be used to provide labels for the success or failure
|
||||||
|
// of a sink node used for an audit fallback device.
|
||||||
|
type MetricLabelerAuditFallback struct{}
|
||||||
|
|
||||||
|
// Labels provides the success and failure labels for an audit sink, based on the error supplied.
|
||||||
|
// Success: 'vault.audit.sink.success'
|
||||||
|
// Failure: 'vault.audit.sink.failure'
|
||||||
|
func (m MetricLabelerAuditSink) Labels(_ *eventlogger.Event, err error) []string {
|
||||||
|
if err != nil {
|
||||||
|
return metricLabelAuditSinkFailure
|
||||||
|
}
|
||||||
|
|
||||||
|
return metricLabelAuditSinkSuccess
|
||||||
|
}
|
||||||
|
|
||||||
|
// Labels provides the success and failures labels for an audit fallback sink, based on the error supplied.
|
||||||
|
// Success: 'vault.audit.fallback.success'
|
||||||
|
// Failure: 'vault.audit.sink.failure'
|
||||||
|
func (m MetricLabelerAuditFallback) Labels(_ *eventlogger.Event, err error) []string {
|
||||||
|
if err != nil {
|
||||||
|
return metricLabelAuditSinkFailure
|
||||||
|
}
|
||||||
|
|
||||||
|
return metricLabelAuditFallbackSuccess
|
||||||
|
}
|
||||||
|
|
||||||
|
// MetricLabelsFallbackMiss returns the labels which indicate an audit entry was missed.
|
||||||
|
// 'vault.audit.fallback.miss'
|
||||||
|
func MetricLabelsFallbackMiss() []string {
|
||||||
|
return metricLabelAuditFallbackMiss
|
||||||
|
}
|
||||||
75
audit/sink_metric_labeler_test.go
Normal file
75
audit/sink_metric_labeler_test.go
Normal file
@@ -0,0 +1,75 @@
|
|||||||
|
// Copyright (c) HashiCorp, Inc.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package audit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestMetricLabelerAuditSink_Label ensures we always get the right label based
|
||||||
|
// on the input value of the error.
|
||||||
|
func TestMetricLabelerAuditSink_Label(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
tests := map[string]struct {
|
||||||
|
err error
|
||||||
|
expected []string
|
||||||
|
}{
|
||||||
|
"nil": {
|
||||||
|
err: nil,
|
||||||
|
expected: []string{"audit", "sink", "success"},
|
||||||
|
},
|
||||||
|
"error": {
|
||||||
|
err: errors.New("I am an error"),
|
||||||
|
expected: []string{"audit", "sink", "failure"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range tests {
|
||||||
|
name := name
|
||||||
|
tc := tc
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
m := &MetricLabelerAuditSink{}
|
||||||
|
result := m.Labels(nil, tc.err)
|
||||||
|
assert.Equal(t, tc.expected, result)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestMetricLabelerAuditFallback_Label ensures we always get the right label based
|
||||||
|
// on the input value of the error for fallback devices.
|
||||||
|
func TestMetricLabelerAuditFallback_Label(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
tests := map[string]struct {
|
||||||
|
err error
|
||||||
|
expected []string
|
||||||
|
}{
|
||||||
|
"nil": {
|
||||||
|
err: nil,
|
||||||
|
expected: []string{"audit", "fallback", "success"},
|
||||||
|
},
|
||||||
|
"error": {
|
||||||
|
err: errors.New("I am an error"),
|
||||||
|
expected: []string{"audit", "sink", "failure"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range tests {
|
||||||
|
name := name
|
||||||
|
tc := tc
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
m := &MetricLabelerAuditFallback{}
|
||||||
|
result := m.Labels(nil, tc.err)
|
||||||
|
assert.Equal(t, tc.expected, result)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
79
audit/sink_metric_timer.go
Normal file
79
audit/sink_metric_timer.go
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
// Copyright (c) HashiCorp, Inc.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package audit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/hashicorp/eventlogger"
|
||||||
|
"github.com/hashicorp/vault/internal/observability/event"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ eventlogger.Node = (*SinkMetricTimer)(nil)
|
||||||
|
|
||||||
|
// SinkMetricTimer is a wrapper for any kind of eventlogger.NodeTypeSink node that
|
||||||
|
// processes events containing an AuditEvent payload.
|
||||||
|
// It decorates the implemented eventlogger.Node Process method in order to emit
|
||||||
|
// timing metrics for the duration between the creation time of the event and the
|
||||||
|
// time the node completes processing.
|
||||||
|
type SinkMetricTimer struct {
|
||||||
|
Name string
|
||||||
|
Sink eventlogger.Node
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSinkMetricTimer should be used to create the SinkMetricTimer.
|
||||||
|
// It expects that an eventlogger.NodeTypeSink should be supplied as the sink.
|
||||||
|
func NewSinkMetricTimer(name string, sink eventlogger.Node) (*SinkMetricTimer, error) {
|
||||||
|
const op = "audit.NewSinkMetricTimer"
|
||||||
|
|
||||||
|
name = strings.TrimSpace(name)
|
||||||
|
if name == "" {
|
||||||
|
return nil, fmt.Errorf("%s: name is required: %w", op, event.ErrInvalidParameter)
|
||||||
|
}
|
||||||
|
|
||||||
|
if sink == nil || reflect.ValueOf(sink).IsNil() {
|
||||||
|
return nil, fmt.Errorf("%s: sink node is required: %w", op, event.ErrInvalidParameter)
|
||||||
|
}
|
||||||
|
|
||||||
|
if sink.Type() != eventlogger.NodeTypeSink {
|
||||||
|
return nil, fmt.Errorf("%s: sink node must be of type 'sink': %w", op, event.ErrInvalidParameter)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &SinkMetricTimer{
|
||||||
|
Name: name,
|
||||||
|
Sink: sink,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process wraps the Process method of underlying sink (eventlogger.Node).
|
||||||
|
// Additionally, when the supplied eventlogger.Event has an AuditEvent as its payload,
|
||||||
|
// it measures the elapsed time between the creation of the eventlogger.Event and
|
||||||
|
// the completion of processing, emitting this as a metric.
|
||||||
|
// Examples:
|
||||||
|
// 'vault.audit.{DEVICE}.log_request'
|
||||||
|
// 'vault.audit.{DEVICE}.log_response'
|
||||||
|
func (s *SinkMetricTimer) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
|
||||||
|
defer func() {
|
||||||
|
auditEvent, ok := e.Payload.(*AuditEvent)
|
||||||
|
if ok {
|
||||||
|
metrics.MeasureSince([]string{"audit", s.Name, auditEvent.Subtype.MetricTag()}, e.CreatedAt)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return s.Sink.Process(ctx, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reopen wraps the Reopen method of this underlying sink (eventlogger.Node).
|
||||||
|
func (s *SinkMetricTimer) Reopen() error {
|
||||||
|
return s.Sink.Reopen()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type wraps the Type method of this underlying sink (eventlogger.Node).
|
||||||
|
func (s *SinkMetricTimer) Type() eventlogger.NodeType {
|
||||||
|
return s.Sink.Type()
|
||||||
|
}
|
||||||
68
audit/sink_metric_timer_test.go
Normal file
68
audit/sink_metric_timer_test.go
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
// Copyright (c) HashiCorp, Inc.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package audit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/eventlogger"
|
||||||
|
"github.com/hashicorp/vault/internal/observability/event"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestNewSinkMetricTimer ensures that parameters are checked correctly and errors
|
||||||
|
// reported as expected when attempting to create a SinkMetricTimer.
|
||||||
|
func TestNewSinkMetricTimer(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
tests := map[string]struct {
|
||||||
|
name string
|
||||||
|
node eventlogger.Node
|
||||||
|
isErrorExpected bool
|
||||||
|
expectedErrorMessage string
|
||||||
|
}{
|
||||||
|
"happy": {
|
||||||
|
name: "foo",
|
||||||
|
node: &event.FileSink{},
|
||||||
|
isErrorExpected: false,
|
||||||
|
},
|
||||||
|
"no-name": {
|
||||||
|
name: "",
|
||||||
|
isErrorExpected: true,
|
||||||
|
expectedErrorMessage: "audit.NewSinkMetricTimer: name is required: invalid parameter",
|
||||||
|
},
|
||||||
|
"no-node": {
|
||||||
|
name: "foo",
|
||||||
|
node: nil,
|
||||||
|
isErrorExpected: true,
|
||||||
|
expectedErrorMessage: "audit.NewSinkMetricTimer: sink node is required: invalid parameter",
|
||||||
|
},
|
||||||
|
"bad-node": {
|
||||||
|
name: "foo",
|
||||||
|
node: &EntryFormatter{},
|
||||||
|
isErrorExpected: true,
|
||||||
|
expectedErrorMessage: "audit.NewSinkMetricTimer: sink node must be of type 'sink': invalid parameter",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range tests {
|
||||||
|
name := name
|
||||||
|
tc := tc
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
m, err := NewSinkMetricTimer(tc.name, tc.node)
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case tc.isErrorExpected:
|
||||||
|
require.Error(t, err)
|
||||||
|
require.EqualError(t, err, tc.expectedErrorMessage)
|
||||||
|
require.Nil(t, m)
|
||||||
|
default:
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, m)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,47 +0,0 @@
|
|||||||
// Copyright (c) HashiCorp, Inc.
|
|
||||||
// SPDX-License-Identifier: BUSL-1.1
|
|
||||||
|
|
||||||
package audit
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
metrics "github.com/armon/go-metrics"
|
|
||||||
|
|
||||||
"github.com/hashicorp/eventlogger"
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ eventlogger.Node = (*SinkWrapper)(nil)
|
|
||||||
|
|
||||||
// SinkWrapper is a wrapper for any kind of Sink Node that processes events
|
|
||||||
// containing an AuditEvent payload.
|
|
||||||
type SinkWrapper struct {
|
|
||||||
Name string
|
|
||||||
Sink eventlogger.Node
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process simply wraps the Process method of this SinkWrapper's sink field by
|
|
||||||
// taking a measurement of the time elapsed since the provided Event was created
|
|
||||||
// once this method returns.
|
|
||||||
func (s *SinkWrapper) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
|
|
||||||
defer func() {
|
|
||||||
auditEvent, ok := e.Payload.(*AuditEvent)
|
|
||||||
if ok {
|
|
||||||
metrics.MeasureSince([]string{"audit", s.Name, auditEvent.Subtype.MetricTag()}, e.CreatedAt)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return s.Sink.Process(ctx, e)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reopen simply wraps the Reopen method of this SinkWrapper's sink field
|
|
||||||
// without doing any additional work.
|
|
||||||
func (s *SinkWrapper) Reopen() error {
|
|
||||||
return s.Sink.Reopen()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Type simply wraps the Type method of this SinkWrapper's sink field without
|
|
||||||
// doing any additional work.
|
|
||||||
func (s *SinkWrapper) Type() eventlogger.NodeType {
|
|
||||||
return s.Sink.Type()
|
|
||||||
}
|
|
||||||
@@ -464,6 +464,7 @@ func (b *Backend) configureFilterNode(filter string) error {
|
|||||||
|
|
||||||
b.nodeIDList = append(b.nodeIDList, filterNodeID)
|
b.nodeIDList = append(b.nodeIDList, filterNodeID)
|
||||||
b.nodeMap[filterNodeID] = filterNode
|
b.nodeMap[filterNodeID] = filterNode
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -483,6 +484,7 @@ func (b *Backend) configureFormatterNode(formatConfig audit.FormatterConfig, opt
|
|||||||
|
|
||||||
b.nodeIDList = append(b.nodeIDList, formatterNodeID)
|
b.nodeIDList = append(b.nodeIDList, formatterNodeID)
|
||||||
b.nodeMap[formatterNodeID] = formatterNode
|
b.nodeMap[formatterNodeID] = formatterNode
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -537,10 +539,29 @@ func (b *Backend) configureSinkNode(name string, filePath string, mode string, f
|
|||||||
return fmt.Errorf("%s: file sink creation failed for path %q: %w", op, filePath, err)
|
return fmt.Errorf("%s: file sink creation failed for path %q: %w", op, filePath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sinkNode = &audit.SinkWrapper{Name: sinkName, Sink: sinkNode}
|
// Wrap the sink node with metrics middleware
|
||||||
|
sinkMetricTimer, err := audit.NewSinkMetricTimer(sinkName, sinkNode)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: unable to add timing metrics to sink for path %q: %w", op, filePath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decide what kind of labels we want and wrap the sink node inside a metrics counter.
|
||||||
|
var metricLabeler event.Labeler
|
||||||
|
switch {
|
||||||
|
case b.fallback:
|
||||||
|
metricLabeler = &audit.MetricLabelerAuditFallback{}
|
||||||
|
default:
|
||||||
|
metricLabeler = &audit.MetricLabelerAuditSink{}
|
||||||
|
}
|
||||||
|
|
||||||
|
sinkMetricCounter, err := event.NewMetricsCounter(sinkName, sinkMetricTimer, metricLabeler)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: unable to add counting metrics to sink for path %q: %w", op, filePath, err)
|
||||||
|
}
|
||||||
|
|
||||||
b.nodeIDList = append(b.nodeIDList, sinkNodeID)
|
b.nodeIDList = append(b.nodeIDList, sinkNodeID)
|
||||||
b.nodeMap[sinkNodeID] = sinkNode
|
b.nodeMap[sinkNodeID] = sinkMetricCounter
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -566,6 +587,10 @@ func (b *Backend) EventType() eventlogger.EventType {
|
|||||||
|
|
||||||
// HasFiltering determines if the first node for the pipeline is an eventlogger.NodeTypeFilter.
|
// HasFiltering determines if the first node for the pipeline is an eventlogger.NodeTypeFilter.
|
||||||
func (b *Backend) HasFiltering() bool {
|
func (b *Backend) HasFiltering() bool {
|
||||||
|
if b.nodeMap == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
return len(b.nodeIDList) > 0 && b.nodeMap[b.nodeIDList[0]].Type() == eventlogger.NodeTypeFilter
|
return len(b.nodeIDList) > 0 && b.nodeMap[b.nodeIDList[0]].Type() == eventlogger.NodeTypeFilter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import (
|
|||||||
"github.com/hashicorp/eventlogger"
|
"github.com/hashicorp/eventlogger"
|
||||||
"github.com/hashicorp/vault/audit"
|
"github.com/hashicorp/vault/audit"
|
||||||
"github.com/hashicorp/vault/helper/namespace"
|
"github.com/hashicorp/vault/helper/namespace"
|
||||||
|
"github.com/hashicorp/vault/internal/observability/event"
|
||||||
"github.com/hashicorp/vault/sdk/helper/salt"
|
"github.com/hashicorp/vault/sdk/helper/salt"
|
||||||
"github.com/hashicorp/vault/sdk/logical"
|
"github.com/hashicorp/vault/sdk/logical"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@@ -529,9 +530,9 @@ func TestBackend_configureSinkNode(t *testing.T) {
|
|||||||
id := b.nodeIDList[0]
|
id := b.nodeIDList[0]
|
||||||
node := b.nodeMap[id]
|
node := b.nodeMap[id]
|
||||||
require.Equal(t, eventlogger.NodeTypeSink, node.Type())
|
require.Equal(t, eventlogger.NodeTypeSink, node.Type())
|
||||||
sw, ok := node.(*audit.SinkWrapper)
|
mc, ok := node.(*event.MetricsCounter)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
require.Equal(t, tc.expectedName, sw.Name)
|
require.Equal(t, tc.expectedName, mc.Name)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -377,6 +377,7 @@ func (b *Backend) configureFilterNode(filter string) error {
|
|||||||
|
|
||||||
b.nodeIDList = append(b.nodeIDList, filterNodeID)
|
b.nodeIDList = append(b.nodeIDList, filterNodeID)
|
||||||
b.nodeMap[filterNodeID] = filterNode
|
b.nodeMap[filterNodeID] = filterNode
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -396,6 +397,7 @@ func (b *Backend) configureFormatterNode(formatConfig audit.FormatterConfig, opt
|
|||||||
|
|
||||||
b.nodeIDList = append(b.nodeIDList, formatterNodeID)
|
b.nodeIDList = append(b.nodeIDList, formatterNodeID)
|
||||||
b.nodeMap[formatterNodeID] = formatterNode
|
b.nodeMap[formatterNodeID] = formatterNode
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -428,10 +430,29 @@ func (b *Backend) configureSinkNode(name string, address string, format string,
|
|||||||
return fmt.Errorf("%s: error creating socket sink node: %w", op, err)
|
return fmt.Errorf("%s: error creating socket sink node: %w", op, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sinkNode := &audit.SinkWrapper{Name: name, Sink: n}
|
// Wrap the sink node with metrics middleware
|
||||||
|
sinkMetricTimer, err := audit.NewSinkMetricTimer(name, n)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: unable to add timing metrics to sink for path %q: %w", op, name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decide what kind of labels we want and wrap the sink node inside a metrics counter.
|
||||||
|
var metricLabeler event.Labeler
|
||||||
|
switch {
|
||||||
|
case b.fallback:
|
||||||
|
metricLabeler = &audit.MetricLabelerAuditFallback{}
|
||||||
|
default:
|
||||||
|
metricLabeler = &audit.MetricLabelerAuditSink{}
|
||||||
|
}
|
||||||
|
|
||||||
|
sinkMetricCounter, err := event.NewMetricsCounter(name, sinkMetricTimer, metricLabeler)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: unable to add counting metrics to sink for path %q: %w", op, name, err)
|
||||||
|
}
|
||||||
|
|
||||||
b.nodeIDList = append(b.nodeIDList, sinkNodeID)
|
b.nodeIDList = append(b.nodeIDList, sinkNodeID)
|
||||||
b.nodeMap[sinkNodeID] = sinkNode
|
b.nodeMap[sinkNodeID] = sinkMetricCounter
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -457,6 +478,10 @@ func (b *Backend) EventType() eventlogger.EventType {
|
|||||||
|
|
||||||
// HasFiltering determines if the first node for the pipeline is an eventlogger.NodeTypeFilter.
|
// HasFiltering determines if the first node for the pipeline is an eventlogger.NodeTypeFilter.
|
||||||
func (b *Backend) HasFiltering() bool {
|
func (b *Backend) HasFiltering() bool {
|
||||||
|
if b.nodeMap == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
return len(b.nodeIDList) > 0 && b.nodeMap[b.nodeIDList[0]].Type() == eventlogger.NodeTypeFilter
|
return len(b.nodeIDList) > 0 && b.nodeMap[b.nodeIDList[0]].Type() == eventlogger.NodeTypeFilter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"github.com/hashicorp/eventlogger"
|
"github.com/hashicorp/eventlogger"
|
||||||
"github.com/hashicorp/vault/audit"
|
"github.com/hashicorp/vault/audit"
|
||||||
|
"github.com/hashicorp/vault/internal/observability/event"
|
||||||
"github.com/hashicorp/vault/sdk/helper/salt"
|
"github.com/hashicorp/vault/sdk/helper/salt"
|
||||||
"github.com/hashicorp/vault/sdk/logical"
|
"github.com/hashicorp/vault/sdk/logical"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@@ -285,9 +286,9 @@ func TestBackend_configureSinkNode(t *testing.T) {
|
|||||||
id := b.nodeIDList[0]
|
id := b.nodeIDList[0]
|
||||||
node := b.nodeMap[id]
|
node := b.nodeMap[id]
|
||||||
require.Equal(t, eventlogger.NodeTypeSink, node.Type())
|
require.Equal(t, eventlogger.NodeTypeSink, node.Type())
|
||||||
sw, ok := node.(*audit.SinkWrapper)
|
mc, ok := node.(*event.MetricsCounter)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
require.Equal(t, tc.expectedName, sw.Name)
|
require.Equal(t, tc.expectedName, mc.Name)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -287,6 +287,7 @@ func (b *Backend) configureFilterNode(filter string) error {
|
|||||||
|
|
||||||
b.nodeIDList = append(b.nodeIDList, filterNodeID)
|
b.nodeIDList = append(b.nodeIDList, filterNodeID)
|
||||||
b.nodeMap[filterNodeID] = filterNode
|
b.nodeMap[filterNodeID] = filterNode
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -306,6 +307,7 @@ func (b *Backend) configureFormatterNode(formatConfig audit.FormatterConfig, opt
|
|||||||
|
|
||||||
b.nodeIDList = append(b.nodeIDList, formatterNodeID)
|
b.nodeIDList = append(b.nodeIDList, formatterNodeID)
|
||||||
b.nodeMap[formatterNodeID] = formatterNode
|
b.nodeMap[formatterNodeID] = formatterNode
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -333,11 +335,29 @@ func (b *Backend) configureSinkNode(name string, format string, opts ...event.Op
|
|||||||
return fmt.Errorf("%s: error creating syslog sink node: %w", op, err)
|
return fmt.Errorf("%s: error creating syslog sink node: %w", op, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrap the sink node with metrics middleware
|
// Wrap the sink node with metrics middleware
|
||||||
sinkNode := &audit.SinkWrapper{Name: name, Sink: n}
|
sinkMetricTimer, err := audit.NewSinkMetricTimer(name, n)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: unable to add timing metrics to sink for path %q: %w", op, name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decide what kind of labels we want and wrap the sink node inside a metrics counter.
|
||||||
|
var metricLabeler event.Labeler
|
||||||
|
switch {
|
||||||
|
case b.fallback:
|
||||||
|
metricLabeler = &audit.MetricLabelerAuditFallback{}
|
||||||
|
default:
|
||||||
|
metricLabeler = &audit.MetricLabelerAuditSink{}
|
||||||
|
}
|
||||||
|
|
||||||
|
sinkMetricCounter, err := event.NewMetricsCounter(name, sinkMetricTimer, metricLabeler)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%s: unable to add counting metrics to sink for path %q: %w", op, name, err)
|
||||||
|
}
|
||||||
|
|
||||||
b.nodeIDList = append(b.nodeIDList, sinkNodeID)
|
b.nodeIDList = append(b.nodeIDList, sinkNodeID)
|
||||||
b.nodeMap[sinkNodeID] = sinkNode
|
b.nodeMap[sinkNodeID] = sinkMetricCounter
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -363,6 +383,10 @@ func (b *Backend) EventType() eventlogger.EventType {
|
|||||||
|
|
||||||
// HasFiltering determines if the first node for the pipeline is an eventlogger.NodeTypeFilter.
|
// HasFiltering determines if the first node for the pipeline is an eventlogger.NodeTypeFilter.
|
||||||
func (b *Backend) HasFiltering() bool {
|
func (b *Backend) HasFiltering() bool {
|
||||||
|
if b.nodeMap == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
return len(b.nodeIDList) > 0 && b.nodeMap[b.nodeIDList[0]].Type() == eventlogger.NodeTypeFilter
|
return len(b.nodeIDList) > 0 && b.nodeMap[b.nodeIDList[0]].Type() == eventlogger.NodeTypeFilter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"github.com/hashicorp/eventlogger"
|
"github.com/hashicorp/eventlogger"
|
||||||
"github.com/hashicorp/vault/audit"
|
"github.com/hashicorp/vault/audit"
|
||||||
|
"github.com/hashicorp/vault/internal/observability/event"
|
||||||
"github.com/hashicorp/vault/sdk/helper/salt"
|
"github.com/hashicorp/vault/sdk/helper/salt"
|
||||||
"github.com/hashicorp/vault/sdk/logical"
|
"github.com/hashicorp/vault/sdk/logical"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@@ -267,9 +268,9 @@ func TestBackend_configureSinkNode(t *testing.T) {
|
|||||||
id := b.nodeIDList[0]
|
id := b.nodeIDList[0]
|
||||||
node := b.nodeMap[id]
|
node := b.nodeMap[id]
|
||||||
require.Equal(t, eventlogger.NodeTypeSink, node.Type())
|
require.Equal(t, eventlogger.NodeTypeSink, node.Type())
|
||||||
sw, ok := node.(*audit.SinkWrapper)
|
mc, ok := node.(*event.MetricsCounter)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
require.Equal(t, tc.expectedName, sw.Name)
|
require.Equal(t, tc.expectedName, mc.Name)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
79
internal/observability/event/node_metrics_counter.go
Normal file
79
internal/observability/event/node_metrics_counter.go
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
// Copyright (c) HashiCorp, Inc.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package event
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/hashicorp/eventlogger"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ eventlogger.Node = (*MetricsCounter)(nil)
|
||||||
|
|
||||||
|
// MetricsCounter offers a way for nodes to emit metrics which increment a label by 1.
|
||||||
|
type MetricsCounter struct {
|
||||||
|
Name string
|
||||||
|
Node eventlogger.Node
|
||||||
|
labeler Labeler
|
||||||
|
}
|
||||||
|
|
||||||
|
// Labeler provides a way to inject the logic required to determine labels based
|
||||||
|
// on the state of the eventlogger.Event being returned and the error resulting
|
||||||
|
// from processing the by the underlying eventlogger.Node.
|
||||||
|
type Labeler interface {
|
||||||
|
Labels(*eventlogger.Event, error) []string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMetricsCounter should be used to create the MetricsCounter.
|
||||||
|
func NewMetricsCounter(name string, node eventlogger.Node, labeler Labeler) (*MetricsCounter, error) {
|
||||||
|
const op = "event.NewMetricsCounter"
|
||||||
|
|
||||||
|
name = strings.TrimSpace(name)
|
||||||
|
if name == "" {
|
||||||
|
return nil, fmt.Errorf("%s: name is required: %w", op, ErrInvalidParameter)
|
||||||
|
}
|
||||||
|
|
||||||
|
if node == nil || reflect.ValueOf(node).IsNil() {
|
||||||
|
return nil, fmt.Errorf("%s: node is required: %w", op, ErrInvalidParameter)
|
||||||
|
}
|
||||||
|
|
||||||
|
if labeler == nil || reflect.ValueOf(labeler).IsNil() {
|
||||||
|
return nil, fmt.Errorf("%s: labeler is required: %w", op, ErrInvalidParameter)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &MetricsCounter{
|
||||||
|
Name: name,
|
||||||
|
Node: node,
|
||||||
|
labeler: labeler,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process will process the event using the underlying eventlogger.Node, and then
|
||||||
|
// use the configured Labeler to provide a label which is used to increment a metric by 1.
|
||||||
|
func (m MetricsCounter) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
|
||||||
|
// NOTE: We don't provide an 'op' here, as we're just wrapping the underlying node.
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// Process the node first
|
||||||
|
e, err = m.Node.Process(ctx, e)
|
||||||
|
|
||||||
|
// Provide the results to the Labeler.
|
||||||
|
metrics.IncrCounter(m.labeler.Labels(e, err), 1)
|
||||||
|
|
||||||
|
return e, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reopen attempts to reopen the underlying eventlogger.Node.
|
||||||
|
func (m MetricsCounter) Reopen() error {
|
||||||
|
return m.Node.Reopen()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type returns the type for the underlying eventlogger.Node.
|
||||||
|
func (m MetricsCounter) Type() eventlogger.NodeType {
|
||||||
|
return m.Node.Type()
|
||||||
|
}
|
||||||
97
internal/observability/event/node_metrics_counter_test.go
Normal file
97
internal/observability/event/node_metrics_counter_test.go
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
// Copyright (c) HashiCorp, Inc.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package event
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/eventlogger"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ eventlogger.Node = (*testEventLoggerNode)(nil)
|
||||||
|
_ Labeler = (*testMetricsCounter)(nil)
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestNewMetricsCounter ensures that NewMetricsCounter operates as intended and
|
||||||
|
// can validate the input parameters correctly, returning the right error message
|
||||||
|
// when required.
|
||||||
|
func TestNewMetricsCounter(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
tests := map[string]struct {
|
||||||
|
name string
|
||||||
|
node eventlogger.Node
|
||||||
|
labeler Labeler
|
||||||
|
isErrorExpected bool
|
||||||
|
expectedErrorMessage string
|
||||||
|
}{
|
||||||
|
"happy": {
|
||||||
|
name: "foo",
|
||||||
|
node: &testEventLoggerNode{},
|
||||||
|
labeler: &testMetricsCounter{},
|
||||||
|
isErrorExpected: false,
|
||||||
|
},
|
||||||
|
"no-name": {
|
||||||
|
node: nil,
|
||||||
|
labeler: nil,
|
||||||
|
isErrorExpected: true,
|
||||||
|
expectedErrorMessage: "event.NewMetricsCounter: name is required: invalid parameter",
|
||||||
|
},
|
||||||
|
"no-node": {
|
||||||
|
name: "foo",
|
||||||
|
node: nil,
|
||||||
|
isErrorExpected: true,
|
||||||
|
expectedErrorMessage: "event.NewMetricsCounter: node is required: invalid parameter",
|
||||||
|
},
|
||||||
|
"no-labeler": {
|
||||||
|
name: "foo",
|
||||||
|
node: &testEventLoggerNode{},
|
||||||
|
labeler: nil,
|
||||||
|
isErrorExpected: true,
|
||||||
|
expectedErrorMessage: "event.NewMetricsCounter: labeler is required: invalid parameter",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range tests {
|
||||||
|
name := name
|
||||||
|
tc := tc
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
m, err := NewMetricsCounter(tc.name, tc.node, tc.labeler)
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case tc.isErrorExpected:
|
||||||
|
require.Error(t, err)
|
||||||
|
require.EqualError(t, err, tc.expectedErrorMessage)
|
||||||
|
default:
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, m)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// testEventLoggerNode is for testing and implements the eventlogger.Node interface.
|
||||||
|
type testEventLoggerNode struct{}
|
||||||
|
|
||||||
|
func (t testEventLoggerNode) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t testEventLoggerNode) Reopen() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t testEventLoggerNode) Type() eventlogger.NodeType {
|
||||||
|
return eventlogger.NodeTypeSink
|
||||||
|
}
|
||||||
|
|
||||||
|
// testMetricsCounter is for testing and implements the event.Labeler interface.
|
||||||
|
type testMetricsCounter struct{}
|
||||||
|
|
||||||
|
func (m *testMetricsCounter) Labels(_ *eventlogger.Event, err error) []string {
|
||||||
|
return []string{""}
|
||||||
|
}
|
||||||
@@ -12,9 +12,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/eventlogger"
|
"github.com/hashicorp/eventlogger"
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-metrics"
|
|
||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
"github.com/hashicorp/vault/audit"
|
"github.com/hashicorp/vault/audit"
|
||||||
"github.com/hashicorp/vault/helper/namespace"
|
"github.com/hashicorp/vault/helper/namespace"
|
||||||
@@ -329,6 +329,9 @@ func (a *AuditBroker) LogRequest(ctx context.Context, in *logical.LogInput, head
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
retErr = multierror.Append(retErr, multierror.Append(fmt.Errorf("auditing request to fallback device failed: %w", err), status.Warnings...))
|
retErr = multierror.Append(retErr, multierror.Append(fmt.Errorf("auditing request to fallback device failed: %w", err), status.Warnings...))
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// This audit event won't make it to any devices, we class this as a 'miss' for auditing.
|
||||||
|
metrics.IncrCounter(audit.MetricLabelsFallbackMiss(), 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -454,6 +457,9 @@ func (a *AuditBroker) LogResponse(ctx context.Context, in *logical.LogInput, hea
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
retErr = multierror.Append(retErr, multierror.Append(fmt.Errorf("auditing response to fallback device failed: %w", err), status.Warnings...))
|
retErr = multierror.Append(retErr, multierror.Append(fmt.Errorf("auditing response to fallback device failed: %w", err), status.Warnings...))
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// This audit event won't make it to any devices, we class this as a 'miss' for auditing.
|
||||||
|
metrics.IncrCounter(audit.MetricLabelsFallbackMiss(), 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user