mirror of
				https://github.com/optim-enterprises-bv/vault.git
				synced 2025-10-29 09:42:25 +00:00 
			
		
		
		
	 ce74f4f1de
			
		
	
	ce74f4f1de
	
	
	
		
			
			Add some metrics helpful for monitoring raft cluster state. Furthermore, we weren't emitting bolt metrics on regular (non-perf) standbys, and there were other metrics in metricsLoop that would make sense to include in OSS but weren't. We now have an active-node-only func, emitMetricsActiveNode. This runs metricsLoop on the active node. Standbys and perf-standbys run metricsLoop from a goroutine managed by the runStandby rungroup.
		
			
				
	
	
		
			1107 lines
		
	
	
		
			29 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1107 lines
		
	
	
		
			29 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package raft
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"context"
 | |
| 	"encoding/hex"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/armon/go-metrics"
 | |
| 	"github.com/golang/protobuf/proto"
 | |
| 	log "github.com/hashicorp/go-hclog"
 | |
| 	"github.com/hashicorp/go-multierror"
 | |
| 	"github.com/hashicorp/go-raftchunking"
 | |
| 	"github.com/hashicorp/go-secure-stdlib/strutil"
 | |
| 	"github.com/hashicorp/raft"
 | |
| 	"github.com/hashicorp/vault/sdk/helper/jsonutil"
 | |
| 	"github.com/hashicorp/vault/sdk/physical"
 | |
| 	"github.com/hashicorp/vault/sdk/plugin/pb"
 | |
| 	bolt "go.etcd.io/bbolt"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	deleteOp uint32 = 1 << iota
 | |
| 	putOp
 | |
| 	restoreCallbackOp
 | |
| 	getOp
 | |
| 
 | |
| 	chunkingPrefix   = "raftchunking/"
 | |
| 	databaseFilename = "vault.db"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	// dataBucketName is the value we use for the bucket
 | |
| 	dataBucketName     = []byte("data")
 | |
| 	configBucketName   = []byte("config")
 | |
| 	latestIndexKey     = []byte("latest_indexes")
 | |
| 	latestConfigKey    = []byte("latest_config")
 | |
| 	localNodeConfigKey = []byte("local_node_config")
 | |
| )
 | |
| 
 | |
| // Verify FSM satisfies the correct interfaces
 | |
| var (
 | |
| 	_ physical.Backend       = (*FSM)(nil)
 | |
| 	_ physical.Transactional = (*FSM)(nil)
 | |
| 	_ raft.FSM               = (*FSM)(nil)
 | |
| 	_ raft.BatchingFSM       = (*FSM)(nil)
 | |
| )
 | |
| 
 | |
| type restoreCallback func(context.Context) error
 | |
| 
 | |
| type FSMEntry struct {
 | |
| 	Key   string
 | |
| 	Value []byte
 | |
| }
 | |
| 
 | |
| func (f *FSMEntry) String() string {
 | |
| 	return fmt.Sprintf("Key: %s. Value: %s", f.Key, hex.EncodeToString(f.Value))
 | |
| }
 | |
| 
 | |
| // FSMApplyResponse is returned from an FSM apply. It indicates if the apply was
 | |
| // successful or not. EntryMap contains the keys/values from the Get operations.
 | |
| type FSMApplyResponse struct {
 | |
| 	Success    bool
 | |
| 	EntrySlice []*FSMEntry
 | |
| }
 | |
| 
 | |
| // FSM is Vault's primary state storage. It writes updates to a bolt db file
 | |
| // that lives on local disk. FSM implements raft.FSM and physical.Backend
 | |
| // interfaces.
 | |
| type FSM struct {
 | |
| 	// latestIndex and latestTerm must stay at the top of this struct to be
 | |
| 	// properly 64-bit aligned.
 | |
| 
 | |
| 	// latestIndex and latestTerm are the term and index of the last log we
 | |
| 	// received
 | |
| 	latestIndex *uint64
 | |
| 	latestTerm  *uint64
 | |
| 	// latestConfig is the latest server configuration we've seen
 | |
| 	latestConfig atomic.Value
 | |
| 
 | |
| 	l           sync.RWMutex
 | |
| 	path        string
 | |
| 	logger      log.Logger
 | |
| 	noopRestore bool
 | |
| 
 | |
| 	// applyCallback is used to control the pace of applies in tests
 | |
| 	applyCallback func()
 | |
| 
 | |
| 	db *bolt.DB
 | |
| 
 | |
| 	// retoreCb is called after we've restored a snapshot
 | |
| 	restoreCb restoreCallback
 | |
| 
 | |
| 	chunker *raftchunking.ChunkingBatchingFSM
 | |
| 
 | |
| 	localID         string
 | |
| 	desiredSuffrage string
 | |
| }
 | |
| 
 | |
| // NewFSM constructs a FSM using the given directory
 | |
| func NewFSM(path string, localID string, logger log.Logger) (*FSM, error) {
 | |
| 	// Initialize the latest term, index, and config values
 | |
| 	latestTerm := new(uint64)
 | |
| 	latestIndex := new(uint64)
 | |
| 	latestConfig := atomic.Value{}
 | |
| 	atomic.StoreUint64(latestTerm, 0)
 | |
| 	atomic.StoreUint64(latestIndex, 0)
 | |
| 	latestConfig.Store((*ConfigurationValue)(nil))
 | |
| 
 | |
| 	f := &FSM{
 | |
| 		path:   path,
 | |
| 		logger: logger,
 | |
| 
 | |
| 		latestTerm:   latestTerm,
 | |
| 		latestIndex:  latestIndex,
 | |
| 		latestConfig: latestConfig,
 | |
| 		// Assume that the default intent is to join as as voter. This will be updated
 | |
| 		// when this node joins a cluster with a different suffrage, or during cluster
 | |
| 		// setup if this is already part of a cluster with a desired suffrage.
 | |
| 		desiredSuffrage: "voter",
 | |
| 		localID:         localID,
 | |
| 	}
 | |
| 
 | |
| 	f.chunker = raftchunking.NewChunkingBatchingFSM(f, &FSMChunkStorage{
 | |
| 		f:   f,
 | |
| 		ctx: context.Background(),
 | |
| 	})
 | |
| 
 | |
| 	dbPath := filepath.Join(path, databaseFilename)
 | |
| 	f.l.Lock()
 | |
| 	defer f.l.Unlock()
 | |
| 	if err := f.openDBFile(dbPath); err != nil {
 | |
| 		return nil, fmt.Errorf("failed to open bolt file: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return f, nil
 | |
| }
 | |
| 
 | |
| func (f *FSM) getDB() *bolt.DB {
 | |
| 	f.l.RLock()
 | |
| 	defer f.l.RUnlock()
 | |
| 
 | |
| 	return f.db
 | |
| }
 | |
| 
 | |
| // SetFSMDelay adds a delay to the FSM apply. This is used in tests to simulate
 | |
| // a slow apply.
 | |
| func (r *RaftBackend) SetFSMDelay(delay time.Duration) {
 | |
| 	r.SetFSMApplyCallback(func() { time.Sleep(delay) })
 | |
| }
 | |
| 
 | |
| func (r *RaftBackend) SetFSMApplyCallback(f func()) {
 | |
| 	r.fsm.l.Lock()
 | |
| 	r.fsm.applyCallback = f
 | |
| 	r.fsm.l.Unlock()
 | |
| }
 | |
| 
 | |
| func (f *FSM) openDBFile(dbPath string) error {
 | |
| 	if len(dbPath) == 0 {
 | |
| 		return errors.New("can not open empty filename")
 | |
| 	}
 | |
| 
 | |
| 	st, err := os.Stat(dbPath)
 | |
| 	switch {
 | |
| 	case err != nil && os.IsNotExist(err):
 | |
| 	case err != nil:
 | |
| 		return fmt.Errorf("error checking raft FSM db file %q: %v", dbPath, err)
 | |
| 	default:
 | |
| 		perms := st.Mode() & os.ModePerm
 | |
| 		if perms&0o077 != 0 {
 | |
| 			f.logger.Warn("raft FSM db file has wider permissions than needed",
 | |
| 				"needed", os.FileMode(0o600), "existing", perms)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	opts := boltOptions(dbPath)
 | |
| 	start := time.Now()
 | |
| 	boltDB, err := bolt.Open(dbPath, 0o600, opts)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	elapsed := time.Now().Sub(start)
 | |
| 	f.logger.Debug("time to open database", "elapsed", elapsed, "path", dbPath)
 | |
| 	metrics.MeasureSince([]string{"raft_storage", "fsm", "open_db_file"}, start)
 | |
| 
 | |
| 	err = boltDB.Update(func(tx *bolt.Tx) error {
 | |
| 		// make sure we have the necessary buckets created
 | |
| 		_, err := tx.CreateBucketIfNotExists(dataBucketName)
 | |
| 		if err != nil {
 | |
| 			return fmt.Errorf("failed to create bucket: %v", err)
 | |
| 		}
 | |
| 		b, err := tx.CreateBucketIfNotExists(configBucketName)
 | |
| 		if err != nil {
 | |
| 			return fmt.Errorf("failed to create bucket: %v", err)
 | |
| 		}
 | |
| 
 | |
| 		// Read in our latest index and term and populate it inmemory
 | |
| 		val := b.Get(latestIndexKey)
 | |
| 		if val != nil {
 | |
| 			var latest IndexValue
 | |
| 			err := proto.Unmarshal(val, &latest)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 
 | |
| 			atomic.StoreUint64(f.latestTerm, latest.Term)
 | |
| 			atomic.StoreUint64(f.latestIndex, latest.Index)
 | |
| 		}
 | |
| 
 | |
| 		// Read in our latest config and populate it inmemory
 | |
| 		val = b.Get(latestConfigKey)
 | |
| 		if val != nil {
 | |
| 			var latest ConfigurationValue
 | |
| 			err := proto.Unmarshal(val, &latest)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 
 | |
| 			f.latestConfig.Store(&latest)
 | |
| 		}
 | |
| 		return nil
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	f.db = boltDB
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (f *FSM) Close() error {
 | |
| 	f.l.RLock()
 | |
| 	defer f.l.RUnlock()
 | |
| 
 | |
| 	return f.db.Close()
 | |
| }
 | |
| 
 | |
| func writeSnapshotMetaToDB(metadata *raft.SnapshotMeta, db *bolt.DB) error {
 | |
| 	latestIndex := &IndexValue{
 | |
| 		Term:  metadata.Term,
 | |
| 		Index: metadata.Index,
 | |
| 	}
 | |
| 	indexBytes, err := proto.Marshal(latestIndex)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	protoConfig := raftConfigurationToProtoConfiguration(metadata.ConfigurationIndex, metadata.Configuration)
 | |
| 	configBytes, err := proto.Marshal(protoConfig)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	err = db.Update(func(tx *bolt.Tx) error {
 | |
| 		b, err := tx.CreateBucketIfNotExists(configBucketName)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		err = b.Put(latestConfigKey, configBytes)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		err = b.Put(latestIndexKey, indexBytes)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (f *FSM) localNodeConfig() (*LocalNodeConfigValue, error) {
 | |
| 	var configBytes []byte
 | |
| 	if err := f.db.View(func(tx *bolt.Tx) error {
 | |
| 		value := tx.Bucket(configBucketName).Get(localNodeConfigKey)
 | |
| 		if value != nil {
 | |
| 			configBytes = make([]byte, len(value))
 | |
| 			copy(configBytes, value)
 | |
| 		}
 | |
| 		return nil
 | |
| 	}); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if configBytes == nil {
 | |
| 		return nil, nil
 | |
| 	}
 | |
| 
 | |
| 	var lnConfig LocalNodeConfigValue
 | |
| 	if configBytes != nil {
 | |
| 		err := proto.Unmarshal(configBytes, &lnConfig)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		f.desiredSuffrage = lnConfig.DesiredSuffrage
 | |
| 		return &lnConfig, nil
 | |
| 	}
 | |
| 
 | |
| 	return nil, nil
 | |
| }
 | |
| 
 | |
| func (f *FSM) DesiredSuffrage() string {
 | |
| 	f.l.RLock()
 | |
| 	defer f.l.RUnlock()
 | |
| 
 | |
| 	return f.desiredSuffrage
 | |
| }
 | |
| 
 | |
| func (f *FSM) upgradeLocalNodeConfig() error {
 | |
| 	f.l.Lock()
 | |
| 	defer f.l.Unlock()
 | |
| 
 | |
| 	// Read the local node config
 | |
| 	lnConfig, err := f.localNodeConfig()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Entry is already present. Get the suffrage value.
 | |
| 	if lnConfig != nil {
 | |
| 		f.desiredSuffrage = lnConfig.DesiredSuffrage
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	//
 | |
| 	// This is the upgrade case where there is no entry
 | |
| 	//
 | |
| 
 | |
| 	lnConfig = &LocalNodeConfigValue{}
 | |
| 
 | |
| 	// Refer to the persisted latest raft config
 | |
| 	config := f.latestConfig.Load().(*ConfigurationValue)
 | |
| 
 | |
| 	// If there is no config, then this is a fresh node coming up. This could end up
 | |
| 	// being a voter or non-voter. But by default assume that this is a voter. It
 | |
| 	// will be changed if this node joins the cluster as a non-voter.
 | |
| 	if config == nil {
 | |
| 		f.desiredSuffrage = "voter"
 | |
| 		lnConfig.DesiredSuffrage = f.desiredSuffrage
 | |
| 		return f.persistDesiredSuffrage(lnConfig)
 | |
| 	}
 | |
| 
 | |
| 	// Get the last known suffrage of the node and assume that it is the desired
 | |
| 	// suffrage. There is no better alternative here.
 | |
| 	for _, srv := range config.Servers {
 | |
| 		if srv.Id == f.localID {
 | |
| 			switch srv.Suffrage {
 | |
| 			case int32(raft.Nonvoter):
 | |
| 				lnConfig.DesiredSuffrage = "non-voter"
 | |
| 			default:
 | |
| 				lnConfig.DesiredSuffrage = "voter"
 | |
| 			}
 | |
| 			// Bring the intent to the fsm instance.
 | |
| 			f.desiredSuffrage = lnConfig.DesiredSuffrage
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return f.persistDesiredSuffrage(lnConfig)
 | |
| }
 | |
| 
 | |
| // recordSuffrage is called when a node successfully joins the cluster. This
 | |
| // intent should land in the stored configuration. If the config isn't available
 | |
| // yet, we still go ahead and store the intent in the fsm. During the next
 | |
| // update to the configuration, this intent will be persisted.
 | |
| func (f *FSM) recordSuffrage(desiredSuffrage string) error {
 | |
| 	f.l.Lock()
 | |
| 	defer f.l.Unlock()
 | |
| 
 | |
| 	if err := f.persistDesiredSuffrage(&LocalNodeConfigValue{
 | |
| 		DesiredSuffrage: desiredSuffrage,
 | |
| 	}); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	f.desiredSuffrage = desiredSuffrage
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (f *FSM) persistDesiredSuffrage(lnconfig *LocalNodeConfigValue) error {
 | |
| 	dsBytes, err := proto.Marshal(lnconfig)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return f.db.Update(func(tx *bolt.Tx) error {
 | |
| 		return tx.Bucket(configBucketName).Put(localNodeConfigKey, dsBytes)
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (f *FSM) witnessSnapshot(metadata *raft.SnapshotMeta) error {
 | |
| 	f.l.RLock()
 | |
| 	defer f.l.RUnlock()
 | |
| 
 | |
| 	err := writeSnapshotMetaToDB(metadata, f.db)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	atomic.StoreUint64(f.latestIndex, metadata.Index)
 | |
| 	atomic.StoreUint64(f.latestTerm, metadata.Term)
 | |
| 	f.latestConfig.Store(raftConfigurationToProtoConfiguration(metadata.ConfigurationIndex, metadata.Configuration))
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // LatestState returns the latest index and configuration values we have seen on
 | |
| // this FSM.
 | |
| func (f *FSM) LatestState() (*IndexValue, *ConfigurationValue) {
 | |
| 	return &IndexValue{
 | |
| 		Term:  atomic.LoadUint64(f.latestTerm),
 | |
| 		Index: atomic.LoadUint64(f.latestIndex),
 | |
| 	}, f.latestConfig.Load().(*ConfigurationValue)
 | |
| }
 | |
| 
 | |
| // Delete deletes the given key from the bolt file.
 | |
| func (f *FSM) Delete(ctx context.Context, path string) error {
 | |
| 	defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete"}, time.Now())
 | |
| 
 | |
| 	f.l.RLock()
 | |
| 	defer f.l.RUnlock()
 | |
| 
 | |
| 	return f.db.Update(func(tx *bolt.Tx) error {
 | |
| 		return tx.Bucket(dataBucketName).Delete([]byte(path))
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // Delete deletes the given key from the bolt file.
 | |
| func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {
 | |
| 	defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete_prefix"}, time.Now())
 | |
| 
 | |
| 	f.l.RLock()
 | |
| 	defer f.l.RUnlock()
 | |
| 
 | |
| 	err := f.db.Update(func(tx *bolt.Tx) error {
 | |
| 		// Assume bucket exists and has keys
 | |
| 		c := tx.Bucket(dataBucketName).Cursor()
 | |
| 
 | |
| 		prefixBytes := []byte(prefix)
 | |
| 		for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() {
 | |
| 			if err := c.Delete(); err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| 
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Get retrieves the value at the given path from the bolt file.
 | |
| func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {
 | |
| 	// TODO: Remove this outdated metric name in an older release
 | |
| 	defer metrics.MeasureSince([]string{"raft", "get"}, time.Now())
 | |
| 	defer metrics.MeasureSince([]string{"raft_storage", "fsm", "get"}, time.Now())
 | |
| 
 | |
| 	f.l.RLock()
 | |
| 	defer f.l.RUnlock()
 | |
| 
 | |
| 	var valCopy []byte
 | |
| 	var found bool
 | |
| 
 | |
| 	err := f.db.View(func(tx *bolt.Tx) error {
 | |
| 		value := tx.Bucket(dataBucketName).Get([]byte(path))
 | |
| 		if value != nil {
 | |
| 			found = true
 | |
| 			valCopy = make([]byte, len(value))
 | |
| 			copy(valCopy, value)
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if !found {
 | |
| 		return nil, nil
 | |
| 	}
 | |
| 
 | |
| 	return &physical.Entry{
 | |
| 		Key:   path,
 | |
| 		Value: valCopy,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // Put writes the given entry to the bolt file.
 | |
| func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {
 | |
| 	defer metrics.MeasureSince([]string{"raft_storage", "fsm", "put"}, time.Now())
 | |
| 
 | |
| 	f.l.RLock()
 | |
| 	defer f.l.RUnlock()
 | |
| 
 | |
| 	// Start a write transaction.
 | |
| 	return f.db.Update(func(tx *bolt.Tx) error {
 | |
| 		return tx.Bucket(dataBucketName).Put([]byte(entry.Key), entry.Value)
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // List retrieves the set of keys with the given prefix from the bolt file.
 | |
| func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) {
 | |
| 	// TODO: Remove this outdated metric name in a future release
 | |
| 	defer metrics.MeasureSince([]string{"raft", "list"}, time.Now())
 | |
| 	defer metrics.MeasureSince([]string{"raft_storage", "fsm", "list"}, time.Now())
 | |
| 
 | |
| 	f.l.RLock()
 | |
| 	defer f.l.RUnlock()
 | |
| 
 | |
| 	var keys []string
 | |
| 
 | |
| 	err := f.db.View(func(tx *bolt.Tx) error {
 | |
| 		// Assume bucket exists and has keys
 | |
| 		c := tx.Bucket(dataBucketName).Cursor()
 | |
| 
 | |
| 		prefixBytes := []byte(prefix)
 | |
| 		for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() {
 | |
| 			key := string(k)
 | |
| 			key = strings.TrimPrefix(key, prefix)
 | |
| 			if i := strings.Index(key, "/"); i == -1 {
 | |
| 				// Add objects only from the current 'folder'
 | |
| 				keys = append(keys, key)
 | |
| 			} else {
 | |
| 				// Add truncated 'folder' paths
 | |
| 				if len(keys) == 0 || keys[len(keys)-1] != key[:i+1] {
 | |
| 					keys = append(keys, string(key[:i+1]))
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| 
 | |
| 	return keys, err
 | |
| }
 | |
| 
 | |
| // Transaction writes all the operations in the provided transaction to the bolt
 | |
| // file.
 | |
| func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
 | |
| 	f.l.RLock()
 | |
| 	defer f.l.RUnlock()
 | |
| 
 | |
| 	// Start a write transaction.
 | |
| 	err := f.db.Update(func(tx *bolt.Tx) error {
 | |
| 		b := tx.Bucket(dataBucketName)
 | |
| 		for _, txn := range txns {
 | |
| 			var err error
 | |
| 			switch txn.Operation {
 | |
| 			case physical.PutOperation:
 | |
| 				err = b.Put([]byte(txn.Entry.Key), txn.Entry.Value)
 | |
| 			case physical.DeleteOperation:
 | |
| 				err = b.Delete([]byte(txn.Entry.Key))
 | |
| 			default:
 | |
| 				return fmt.Errorf("%q is not a supported transaction operation", txn.Operation)
 | |
| 			}
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| 
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // ApplyBatch will apply a set of logs to the FSM. This is called from the raft
 | |
| // library.
 | |
| func (f *FSM) ApplyBatch(logs []*raft.Log) []interface{} {
 | |
| 	numLogs := len(logs)
 | |
| 
 | |
| 	if numLogs == 0 {
 | |
| 		return []interface{}{}
 | |
| 	}
 | |
| 
 | |
| 	// We will construct one slice per log, each slice containing another slice of results from our get ops
 | |
| 	entrySlices := make([][]*FSMEntry, 0, numLogs)
 | |
| 
 | |
| 	// Do the unmarshalling first so we don't hold locks
 | |
| 	var latestConfiguration *ConfigurationValue
 | |
| 	commands := make([]interface{}, 0, numLogs)
 | |
| 	for _, log := range logs {
 | |
| 		switch log.Type {
 | |
| 		case raft.LogCommand:
 | |
| 			command := &LogData{}
 | |
| 			err := proto.Unmarshal(log.Data, command)
 | |
| 			if err != nil {
 | |
| 				f.logger.Error("error proto unmarshaling log data", "error", err)
 | |
| 				panic("error proto unmarshaling log data")
 | |
| 			}
 | |
| 			commands = append(commands, command)
 | |
| 		case raft.LogConfiguration:
 | |
| 			configuration := raft.DecodeConfiguration(log.Data)
 | |
| 			config := raftConfigurationToProtoConfiguration(log.Index, configuration)
 | |
| 
 | |
| 			commands = append(commands, config)
 | |
| 
 | |
| 			// Update the latest configuration the fsm has received; we will
 | |
| 			// store this after it has been committed to storage.
 | |
| 			latestConfiguration = config
 | |
| 
 | |
| 		default:
 | |
| 			panic(fmt.Sprintf("got unexpected log type: %d", log.Type))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Only advance latest pointer if this log has a higher index value than
 | |
| 	// what we have seen in the past.
 | |
| 	var logIndex []byte
 | |
| 	var err error
 | |
| 	latestIndex, _ := f.LatestState()
 | |
| 	lastLog := logs[numLogs-1]
 | |
| 	if latestIndex.Index < lastLog.Index {
 | |
| 		logIndex, err = proto.Marshal(&IndexValue{
 | |
| 			Term:  lastLog.Term,
 | |
| 			Index: lastLog.Index,
 | |
| 		})
 | |
| 		if err != nil {
 | |
| 			f.logger.Error("unable to marshal latest index", "error", err)
 | |
| 			panic("unable to marshal latest index")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	f.l.RLock()
 | |
| 	defer f.l.RUnlock()
 | |
| 
 | |
| 	if f.applyCallback != nil {
 | |
| 		f.applyCallback()
 | |
| 	}
 | |
| 
 | |
| 	err = f.db.Update(func(tx *bolt.Tx) error {
 | |
| 		b := tx.Bucket(dataBucketName)
 | |
| 		for _, commandRaw := range commands {
 | |
| 			entrySlice := make([]*FSMEntry, 0)
 | |
| 			switch command := commandRaw.(type) {
 | |
| 			case *LogData:
 | |
| 				for _, op := range command.Operations {
 | |
| 					var err error
 | |
| 					switch op.OpType {
 | |
| 					case putOp:
 | |
| 						err = b.Put([]byte(op.Key), op.Value)
 | |
| 					case deleteOp:
 | |
| 						err = b.Delete([]byte(op.Key))
 | |
| 					case getOp:
 | |
| 						fsmEntry := &FSMEntry{
 | |
| 							Key: op.Key,
 | |
| 						}
 | |
| 						val := b.Get([]byte(op.Key))
 | |
| 						if len(val) > 0 {
 | |
| 							newVal := make([]byte, len(val))
 | |
| 							copy(newVal, val)
 | |
| 							fsmEntry.Value = newVal
 | |
| 						}
 | |
| 						entrySlice = append(entrySlice, fsmEntry)
 | |
| 					case restoreCallbackOp:
 | |
| 						if f.restoreCb != nil {
 | |
| 							// Kick off the restore callback function in a go routine
 | |
| 							go f.restoreCb(context.Background())
 | |
| 						}
 | |
| 					default:
 | |
| 						return fmt.Errorf("%q is not a supported transaction operation", op.OpType)
 | |
| 					}
 | |
| 					if err != nil {
 | |
| 						return err
 | |
| 					}
 | |
| 				}
 | |
| 
 | |
| 			case *ConfigurationValue:
 | |
| 				b := tx.Bucket(configBucketName)
 | |
| 				configBytes, err := proto.Marshal(command)
 | |
| 				if err != nil {
 | |
| 					return err
 | |
| 				}
 | |
| 				if err := b.Put(latestConfigKey, configBytes); err != nil {
 | |
| 					return err
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			entrySlices = append(entrySlices, entrySlice)
 | |
| 		}
 | |
| 
 | |
| 		if len(logIndex) > 0 {
 | |
| 			b := tx.Bucket(configBucketName)
 | |
| 			err = b.Put(latestIndexKey, logIndex)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		f.logger.Error("failed to store data", "error", err)
 | |
| 		panic("failed to store data")
 | |
| 	}
 | |
| 
 | |
| 	// If we advanced the latest value, update the in-memory representation too.
 | |
| 	if len(logIndex) > 0 {
 | |
| 		atomic.StoreUint64(f.latestTerm, lastLog.Term)
 | |
| 		atomic.StoreUint64(f.latestIndex, lastLog.Index)
 | |
| 	}
 | |
| 
 | |
| 	// If one or more configuration changes were processed, store the latest one.
 | |
| 	if latestConfiguration != nil {
 | |
| 		f.latestConfig.Store(latestConfiguration)
 | |
| 	}
 | |
| 
 | |
| 	// Build the responses. The logs array is used here to ensure we reply to
 | |
| 	// all command values; even if they are not of the types we expect. This
 | |
| 	// should futureproof this function from more log types being provided.
 | |
| 	resp := make([]interface{}, numLogs)
 | |
| 	for i := range logs {
 | |
| 		resp[i] = &FSMApplyResponse{
 | |
| 			Success:    true,
 | |
| 			EntrySlice: entrySlices[i],
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return resp
 | |
| }
 | |
| 
 | |
| // Apply will apply a log value to the FSM. This is called from the raft
 | |
| // library.
 | |
| func (f *FSM) Apply(log *raft.Log) interface{} {
 | |
| 	return f.ApplyBatch([]*raft.Log{log})[0]
 | |
| }
 | |
| 
 | |
| type writeErrorCloser interface {
 | |
| 	io.WriteCloser
 | |
| 	CloseWithError(error) error
 | |
| }
 | |
| 
 | |
| // writeTo will copy the FSM's content to a remote sink. The data is written
 | |
| // twice, once for use in determining various metadata attributes of the dataset
 | |
| // (size, checksum, etc) and a second for the sink of the data. We also use a
 | |
| // proto delimited writer so we can stream proto messages to the sink.
 | |
| func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink writeErrorCloser) {
 | |
| 	defer metrics.MeasureSince([]string{"raft_storage", "fsm", "write_snapshot"}, time.Now())
 | |
| 
 | |
| 	protoWriter := NewDelimitedWriter(sink)
 | |
| 	metadataProtoWriter := NewDelimitedWriter(metaSink)
 | |
| 
 | |
| 	f.l.RLock()
 | |
| 	defer f.l.RUnlock()
 | |
| 
 | |
| 	err := f.db.View(func(tx *bolt.Tx) error {
 | |
| 		b := tx.Bucket(dataBucketName)
 | |
| 
 | |
| 		c := b.Cursor()
 | |
| 
 | |
| 		// Do the first scan of the data for metadata purposes.
 | |
| 		for k, v := c.First(); k != nil; k, v = c.Next() {
 | |
| 			err := metadataProtoWriter.WriteMsg(&pb.StorageEntry{
 | |
| 				Key:   string(k),
 | |
| 				Value: v,
 | |
| 			})
 | |
| 			if err != nil {
 | |
| 				metaSink.CloseWithError(err)
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 		metaSink.Close()
 | |
| 
 | |
| 		// Do the second scan for copy purposes.
 | |
| 		for k, v := c.First(); k != nil; k, v = c.Next() {
 | |
| 			err := protoWriter.WriteMsg(&pb.StorageEntry{
 | |
| 				Key:   string(k),
 | |
| 				Value: v,
 | |
| 			})
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| 	sink.CloseWithError(err)
 | |
| }
 | |
| 
 | |
| // Snapshot implements the FSM interface. It returns a noop snapshot object.
 | |
| func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
 | |
| 	return &noopSnapshotter{
 | |
| 		fsm: f,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // SetNoopRestore is used to disable restore operations on raft startup. Because
 | |
| // we are using persistent storage in our FSM we do not need to issue a restore
 | |
| // on startup.
 | |
| func (f *FSM) SetNoopRestore(enabled bool) {
 | |
| 	f.l.Lock()
 | |
| 	f.noopRestore = enabled
 | |
| 	f.l.Unlock()
 | |
| }
 | |
| 
 | |
| // Restore installs a new snapshot from the provided reader. It does an atomic
 | |
| // rename of the snapshot file into the database filepath. While a restore is
 | |
| // happening the FSM is locked and no writes or reads can be performed.
 | |
| func (f *FSM) Restore(r io.ReadCloser) error {
 | |
| 	defer metrics.MeasureSince([]string{"raft_storage", "fsm", "restore_snapshot"}, time.Now())
 | |
| 
 | |
| 	if f.noopRestore {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	snapshotInstaller, ok := r.(*boltSnapshotInstaller)
 | |
| 	if !ok {
 | |
| 		wrapper, ok := r.(raft.ReadCloserWrapper)
 | |
| 		if !ok {
 | |
| 			return fmt.Errorf("expected ReadCloserWrapper object, got: %T", r)
 | |
| 		}
 | |
| 		snapshotInstallerRaw := wrapper.WrappedReadCloser()
 | |
| 		snapshotInstaller, ok = snapshotInstallerRaw.(*boltSnapshotInstaller)
 | |
| 		if !ok {
 | |
| 			return fmt.Errorf("expected snapshot installer object, got: %T", snapshotInstallerRaw)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	f.l.Lock()
 | |
| 	defer f.l.Unlock()
 | |
| 
 | |
| 	// Cache the local node config before closing the db file
 | |
| 	lnConfig, err := f.localNodeConfig()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Close the db file
 | |
| 	if err := f.db.Close(); err != nil {
 | |
| 		f.logger.Error("failed to close database file", "error", err)
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	dbPath := filepath.Join(f.path, databaseFilename)
 | |
| 
 | |
| 	f.logger.Info("installing snapshot to FSM")
 | |
| 
 | |
| 	// Install the new boltdb file
 | |
| 	var retErr *multierror.Error
 | |
| 	if err := snapshotInstaller.Install(dbPath); err != nil {
 | |
| 		f.logger.Error("failed to install snapshot", "error", err)
 | |
| 		retErr = multierror.Append(retErr, fmt.Errorf("failed to install snapshot database: %w", err))
 | |
| 	} else {
 | |
| 		f.logger.Info("snapshot installed")
 | |
| 	}
 | |
| 
 | |
| 	// Open the db file. We want to do this regardless of if the above install
 | |
| 	// worked. If the install failed we should try to open the old DB file.
 | |
| 	if err := f.openDBFile(dbPath); err != nil {
 | |
| 		f.logger.Error("failed to open new database file", "error", err)
 | |
| 		retErr = multierror.Append(retErr, fmt.Errorf("failed to open new bolt file: %w", err))
 | |
| 	}
 | |
| 
 | |
| 	// Handle local node config restore. lnConfig should not be nil here, but
 | |
| 	// adding the nil check anyways for safety.
 | |
| 	if lnConfig != nil {
 | |
| 		// Persist the local node config on the restored fsm.
 | |
| 		if err := f.persistDesiredSuffrage(lnConfig); err != nil {
 | |
| 			f.logger.Error("failed to persist local node config from before the restore", "error", err)
 | |
| 			retErr = multierror.Append(retErr, fmt.Errorf("failed to persist local node config from before the restore: %w", err))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return retErr.ErrorOrNil()
 | |
| }
 | |
| 
 | |
| // noopSnapshotter implements the fsm.Snapshot interface. It doesn't do anything
 | |
| // since our SnapshotStore reads data out of the FSM on Open().
 | |
| type noopSnapshotter struct {
 | |
| 	fsm *FSM
 | |
| }
 | |
| 
 | |
| // Persist implements the fsm.Snapshot interface. It doesn't need to persist any
 | |
| // state data, but it does persist the raft metadata. This is necessary so we
 | |
| // can be sure to capture indexes for operation types that are not sent to the
 | |
| // FSM.
 | |
| func (s *noopSnapshotter) Persist(sink raft.SnapshotSink) error {
 | |
| 	boltSnapshotSink := sink.(*BoltSnapshotSink)
 | |
| 
 | |
| 	// We are processing a snapshot, fastforward the index, term, and
 | |
| 	// configuration to the latest seen by the raft system.
 | |
| 	if err := s.fsm.witnessSnapshot(&boltSnapshotSink.meta); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Release doesn't do anything.
 | |
| func (s *noopSnapshotter) Release() {}
 | |
| 
 | |
| // raftConfigurationToProtoConfiguration converts a raft configuration object to
 | |
| // a proto value.
 | |
| func raftConfigurationToProtoConfiguration(index uint64, configuration raft.Configuration) *ConfigurationValue {
 | |
| 	servers := make([]*Server, len(configuration.Servers))
 | |
| 	for i, s := range configuration.Servers {
 | |
| 		servers[i] = &Server{
 | |
| 			Suffrage: int32(s.Suffrage),
 | |
| 			Id:       string(s.ID),
 | |
| 			Address:  string(s.Address),
 | |
| 		}
 | |
| 	}
 | |
| 	return &ConfigurationValue{
 | |
| 		Index:   index,
 | |
| 		Servers: servers,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // protoConfigurationToRaftConfiguration converts a proto configuration object
 | |
| // to a raft object.
 | |
| func protoConfigurationToRaftConfiguration(configuration *ConfigurationValue) (uint64, raft.Configuration) {
 | |
| 	servers := make([]raft.Server, len(configuration.Servers))
 | |
| 	for i, s := range configuration.Servers {
 | |
| 		servers[i] = raft.Server{
 | |
| 			Suffrage: raft.ServerSuffrage(s.Suffrage),
 | |
| 			ID:       raft.ServerID(s.Id),
 | |
| 			Address:  raft.ServerAddress(s.Address),
 | |
| 		}
 | |
| 	}
 | |
| 	return configuration.Index, raft.Configuration{
 | |
| 		Servers: servers,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type FSMChunkStorage struct {
 | |
| 	f   *FSM
 | |
| 	ctx context.Context
 | |
| }
 | |
| 
 | |
| // chunkPaths returns a disk prefix and key given chunkinfo
 | |
| func (f *FSMChunkStorage) chunkPaths(chunk *raftchunking.ChunkInfo) (string, string) {
 | |
| 	prefix := fmt.Sprintf("%s%d/", chunkingPrefix, chunk.OpNum)
 | |
| 	key := fmt.Sprintf("%s%d", prefix, chunk.SequenceNum)
 | |
| 	return prefix, key
 | |
| }
 | |
| 
 | |
| func (f *FSMChunkStorage) StoreChunk(chunk *raftchunking.ChunkInfo) (bool, error) {
 | |
| 	b, err := jsonutil.EncodeJSON(chunk)
 | |
| 	if err != nil {
 | |
| 		return false, fmt.Errorf("error encoding chunk info: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	prefix, key := f.chunkPaths(chunk)
 | |
| 
 | |
| 	entry := &physical.Entry{
 | |
| 		Key:   key,
 | |
| 		Value: b,
 | |
| 	}
 | |
| 
 | |
| 	f.f.l.RLock()
 | |
| 	defer f.f.l.RUnlock()
 | |
| 
 | |
| 	// Start a write transaction.
 | |
| 	done := new(bool)
 | |
| 	if err := f.f.db.Update(func(tx *bolt.Tx) error {
 | |
| 		if err := tx.Bucket(dataBucketName).Put([]byte(entry.Key), entry.Value); err != nil {
 | |
| 			return fmt.Errorf("error storing chunk info: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		// Assume bucket exists and has keys
 | |
| 		c := tx.Bucket(dataBucketName).Cursor()
 | |
| 
 | |
| 		var keys []string
 | |
| 		prefixBytes := []byte(prefix)
 | |
| 		for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() {
 | |
| 			key := string(k)
 | |
| 			key = strings.TrimPrefix(key, prefix)
 | |
| 			if i := strings.Index(key, "/"); i == -1 {
 | |
| 				// Add objects only from the current 'folder'
 | |
| 				keys = append(keys, key)
 | |
| 			} else {
 | |
| 				// Add truncated 'folder' paths
 | |
| 				keys = strutil.AppendIfMissing(keys, string(key[:i+1]))
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		*done = uint32(len(keys)) == chunk.NumChunks
 | |
| 
 | |
| 		return nil
 | |
| 	}); err != nil {
 | |
| 		return false, err
 | |
| 	}
 | |
| 
 | |
| 	return *done, nil
 | |
| }
 | |
| 
 | |
| func (f *FSMChunkStorage) FinalizeOp(opNum uint64) ([]*raftchunking.ChunkInfo, error) {
 | |
| 	ret, err := f.chunksForOpNum(opNum)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("error getting chunks for op keys: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	prefix, _ := f.chunkPaths(&raftchunking.ChunkInfo{OpNum: opNum})
 | |
| 	if err := f.f.DeletePrefix(f.ctx, prefix); err != nil {
 | |
| 		return nil, fmt.Errorf("error deleting prefix after op finalization: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return ret, nil
 | |
| }
 | |
| 
 | |
| func (f *FSMChunkStorage) chunksForOpNum(opNum uint64) ([]*raftchunking.ChunkInfo, error) {
 | |
| 	prefix, _ := f.chunkPaths(&raftchunking.ChunkInfo{OpNum: opNum})
 | |
| 
 | |
| 	opChunkKeys, err := f.f.List(f.ctx, prefix)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("error fetching op chunk keys: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	if len(opChunkKeys) == 0 {
 | |
| 		return nil, nil
 | |
| 	}
 | |
| 
 | |
| 	var ret []*raftchunking.ChunkInfo
 | |
| 
 | |
| 	for _, v := range opChunkKeys {
 | |
| 		seqNum, err := strconv.ParseInt(v, 10, 64)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("error converting seqnum to integer: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		entry, err := f.f.Get(f.ctx, prefix+v)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("error fetching chunkinfo: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		var ci raftchunking.ChunkInfo
 | |
| 		if err := jsonutil.DecodeJSON(entry.Value, &ci); err != nil {
 | |
| 			return nil, fmt.Errorf("error decoding chunkinfo json: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		if ret == nil {
 | |
| 			ret = make([]*raftchunking.ChunkInfo, ci.NumChunks)
 | |
| 		}
 | |
| 
 | |
| 		ret[seqNum] = &ci
 | |
| 	}
 | |
| 
 | |
| 	return ret, nil
 | |
| }
 | |
| 
 | |
| func (f *FSMChunkStorage) GetChunks() (raftchunking.ChunkMap, error) {
 | |
| 	opNums, err := f.f.List(f.ctx, chunkingPrefix)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("error doing recursive list for chunk saving: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	if len(opNums) == 0 {
 | |
| 		return nil, nil
 | |
| 	}
 | |
| 
 | |
| 	ret := make(raftchunking.ChunkMap, len(opNums))
 | |
| 	for _, opNumStr := range opNums {
 | |
| 		opNum, err := strconv.ParseInt(opNumStr, 10, 64)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("error parsing op num during chunk saving: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		opChunks, err := f.chunksForOpNum(uint64(opNum))
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("error getting chunks for op keys during chunk saving: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		ret[uint64(opNum)] = opChunks
 | |
| 	}
 | |
| 
 | |
| 	return ret, nil
 | |
| }
 | |
| 
 | |
| func (f *FSMChunkStorage) RestoreChunks(chunks raftchunking.ChunkMap) error {
 | |
| 	if err := f.f.DeletePrefix(f.ctx, chunkingPrefix); err != nil {
 | |
| 		return fmt.Errorf("error deleting prefix for chunk restoration: %w", err)
 | |
| 	}
 | |
| 	if len(chunks) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	for opNum, opChunks := range chunks {
 | |
| 		for _, chunk := range opChunks {
 | |
| 			if chunk == nil {
 | |
| 				continue
 | |
| 			}
 | |
| 			if chunk.OpNum != opNum {
 | |
| 				return errors.New("unexpected op number in chunk")
 | |
| 			}
 | |
| 			if _, err := f.StoreChunk(chunk); err != nil {
 | |
| 				return fmt.Errorf("error storing chunk during restoration: %w", err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 |