mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-10-29 09:42:25 +00:00
* Add trace logging to context creation during log req/resp. Improve context sensitivity of sink nodes (file, socket), update eventlogger to include context info in error * changelog * Queue for the lock but check context immediately * fix race in test
239 lines
6.2 KiB
Go
239 lines
6.2 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package event
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/hashicorp/eventlogger"
|
|
"github.com/hashicorp/go-hclog"
|
|
)
|
|
|
|
// defaultFileMode is the default file permissions (read/write for everyone).
|
|
const (
|
|
defaultFileMode = 0o600
|
|
devnull = "/dev/null"
|
|
)
|
|
|
|
var _ eventlogger.Node = (*FileSink)(nil)
|
|
|
|
// FileSink is a sink node which handles writing events to file.
|
|
type FileSink struct {
|
|
file *os.File
|
|
fileLock sync.RWMutex
|
|
fileMode os.FileMode
|
|
path string
|
|
requiredFormat string
|
|
logger hclog.Logger
|
|
}
|
|
|
|
// NewFileSink should be used to create a new FileSink.
|
|
// Accepted options: WithFileMode.
|
|
func NewFileSink(path string, format string, opt ...Option) (*FileSink, error) {
|
|
// Parse and check path
|
|
p := strings.TrimSpace(path)
|
|
if p == "" {
|
|
return nil, fmt.Errorf("path is required: %w", ErrInvalidParameter)
|
|
}
|
|
|
|
opts, err := getOpts(opt...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mode := os.FileMode(defaultFileMode)
|
|
// If we got an optional file mode supplied and our path isn't a special keyword
|
|
// then we should use the supplied file mode, or maintain the existing file mode.
|
|
switch {
|
|
case path == devnull:
|
|
case opts.withFileMode == nil:
|
|
case *opts.withFileMode == 0: // Maintain the existing file's mode when set to "0000".
|
|
fileInfo, err := os.Stat(path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to determine existing file mode: %w", err)
|
|
}
|
|
mode = fileInfo.Mode()
|
|
default:
|
|
mode = *opts.withFileMode
|
|
}
|
|
|
|
sink := &FileSink{
|
|
file: nil,
|
|
fileLock: sync.RWMutex{},
|
|
fileMode: mode,
|
|
requiredFormat: format,
|
|
path: p,
|
|
logger: opts.withLogger,
|
|
}
|
|
|
|
// Ensure that the file can be successfully opened for writing;
|
|
// otherwise it will be too late to catch later without problems
|
|
// (ref: https://github.com/hashicorp/vault/issues/550)
|
|
if err := sink.open(); err != nil {
|
|
return nil, fmt.Errorf("sanity check failed; unable to open %q for writing: %w", sink.path, err)
|
|
}
|
|
|
|
return sink, nil
|
|
}
|
|
|
|
// Process handles writing the event to the file sink.
|
|
func (s *FileSink) Process(ctx context.Context, e *eventlogger.Event) (_ *eventlogger.Event, retErr error) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
defer func() {
|
|
// If the context is errored (cancelled), and we were planning to return
|
|
// an error, let's also log (if we have a logger) in case the eventlogger's
|
|
// status channel and errors propagated.
|
|
if err := ctx.Err(); err != nil && retErr != nil && s.logger != nil {
|
|
s.logger.Error("file sink error", "context", err, "error", retErr)
|
|
}
|
|
}()
|
|
|
|
if e == nil {
|
|
return nil, fmt.Errorf("event is nil: %w", ErrInvalidParameter)
|
|
}
|
|
|
|
// '/dev/null' path means we just do nothing and pretend we're done.
|
|
if s.path == devnull {
|
|
return nil, nil
|
|
}
|
|
|
|
formatted, found := e.Format(s.requiredFormat)
|
|
if !found {
|
|
return nil, fmt.Errorf("unable to retrieve event formatted as %q: %w", s.requiredFormat, ErrInvalidParameter)
|
|
}
|
|
|
|
err := s.log(ctx, formatted)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error writing file for sink %q: %w", s.path, err)
|
|
}
|
|
|
|
// return nil for the event to indicate the pipeline is complete.
|
|
return nil, nil
|
|
}
|
|
|
|
// Reopen handles closing and reopening the file.
|
|
func (s *FileSink) Reopen() error {
|
|
// '/dev/null' path means we just do nothing and pretend we're done.
|
|
if s.path == devnull {
|
|
return nil
|
|
}
|
|
|
|
s.fileLock.Lock()
|
|
defer s.fileLock.Unlock()
|
|
|
|
if s.file == nil {
|
|
return s.open()
|
|
}
|
|
|
|
err := s.file.Close()
|
|
// Set to nil here so that even if we error out, on the next access open() will be tried.
|
|
s.file = nil
|
|
if err != nil {
|
|
return fmt.Errorf("unable to close file for re-opening on sink %q: %w", s.path, err)
|
|
}
|
|
|
|
return s.open()
|
|
}
|
|
|
|
// Type describes the type of this node (sink).
|
|
func (s *FileSink) Type() eventlogger.NodeType {
|
|
return eventlogger.NodeTypeSink
|
|
}
|
|
|
|
// open attempts to open a file at the sink's path, with the sink's fileMode permissions
|
|
// if one is not already open.
|
|
// It doesn't have any locking and relies on calling functions of FileSink to
|
|
// handle this (e.g. log and Reopen methods).
|
|
func (s *FileSink) open() error {
|
|
if s.file != nil {
|
|
return nil
|
|
}
|
|
|
|
if err := os.MkdirAll(filepath.Dir(s.path), s.fileMode); err != nil {
|
|
return fmt.Errorf("unable to create file %q: %w", s.path, err)
|
|
}
|
|
|
|
var err error
|
|
s.file, err = os.OpenFile(s.path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, s.fileMode)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to open file for sink %q: %w", s.path, err)
|
|
}
|
|
|
|
// Change the file mode in case the log file already existed.
|
|
// We special case '/dev/null' since we can't chmod it, and bypass if the mode is zero.
|
|
switch s.path {
|
|
case devnull:
|
|
default:
|
|
if s.fileMode != 0 {
|
|
err = os.Chmod(s.path, s.fileMode)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to change file permissions '%v' for sink %q: %w", s.fileMode, s.path, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// log writes the buffer to the file.
|
|
// NOTE: We attempt to acquire a lock on the file in order to write, but will
|
|
// yield if the context is 'done'.
|
|
func (s *FileSink) log(ctx context.Context, data []byte) error {
|
|
// Wait for the lock, but ensure we check for a cancelled context as soon as
|
|
// we have it, as there's no point in continuing if we're cancelled.
|
|
s.fileLock.Lock()
|
|
defer s.fileLock.Unlock()
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
reader := bytes.NewReader(data)
|
|
|
|
if err := s.open(); err != nil {
|
|
return fmt.Errorf("unable to open file for sink %q: %w", s.path, err)
|
|
}
|
|
|
|
if _, err := reader.WriteTo(s.file); err == nil {
|
|
return nil
|
|
}
|
|
|
|
// Otherwise, opportunistically try to re-open the FD, once per call (1 retry attempt).
|
|
err := s.file.Close()
|
|
if err != nil {
|
|
return fmt.Errorf("unable to close file for sink %q: %w", s.path, err)
|
|
}
|
|
|
|
s.file = nil
|
|
|
|
if err := s.open(); err != nil {
|
|
return fmt.Errorf("unable to re-open file for sink %q: %w", s.path, err)
|
|
}
|
|
|
|
_, err = reader.Seek(0, io.SeekStart)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to seek to start of file for sink %q: %w", s.path, err)
|
|
}
|
|
|
|
_, err = reader.WriteTo(s.file)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to re-write to file for sink %q: %w", s.path, err)
|
|
}
|
|
|
|
return nil
|
|
}
|