2022-04-25 12:23:57 -05:00

171 lines
5.0 KiB
Go

// Package state manages the state that the singleprocess server has, providing
// operations to mutate that state safely as needed.
package state
import (
"crypto/rand"
"fmt"
"reflect"
"time"
"github.com/boltdb/bolt"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/oklog/ulid/v2"
)
// The global variables below can be set by init() functions of other
// files in this package to setup the database state for the server.
var (
// schemas is used to register schemas with the state store. Other files should
// use the init() callback to append to this.
schemas []schemaFn
// dbBuckets is the list of buckets that should be created by dbInit.
// Various components should use init() funcs to append to this.
dbBuckets [][]byte
// dbIndexers is the list of functions to call to initialize the
// in-memory indexes from the persisted db.
dbIndexers []indexFn
entropy = rand.Reader
)
// State is the primary API for state mutation for the server.
type State struct {
// inmem is our in-memory database that stores ephemeral data in an
// easier-to-query way. Some of this data may be periodically persisted
// but most of this data is meant to be lost when the process restarts.
inmem *memdb.MemDB
// db is our persisted on-disk database. This stores the bulk of data
// and supports a transactional model for safe concurrent access.
// inmem is used alongside db to store in-memory indexing information
// for more efficient lookups into db. This index is built online at
// boot.
db *bolt.DB
// hmacKeyNotEmpty is flipped to 1 when an hmac entry is set. This is
// used to determine if we're in a bootstrap state and can create a
// bootstrap token.
hmacKeyNotEmpty uint32
// indexers is used to track whether an indexer was called. This is
// initialized during New and set to nil at the end of New.
indexers map[uintptr]struct{}
// Where to log to
log hclog.Logger
}
// New initializes a new State store.
func New(log hclog.Logger, db *bolt.DB) (*State, error) {
// Restore DB if necessary
db, err := finalizeRestore(log, db)
if err != nil {
log.Trace("failure encountered during finalize restore", "error", err)
return nil, err
}
// Create the in-memory DB.
inmem, err := memdb.NewMemDB(stateStoreSchema())
if err != nil {
log.Trace("failed to setup in-memory database", "error", err)
return nil, fmt.Errorf("Failed setting up state store: %s", err)
}
// Initialize and validate our on-disk format.
if err := dbInit(db); err != nil {
log.Error("failed to initialize and validate on-disk format", "error", err)
return nil, err
}
s := &State{inmem: inmem, db: db, log: log}
// Initialize our set that'll track what memdb indexers we call.
// When we're done we always clear this out since it is never used
// again.
s.indexers = make(map[uintptr]struct{})
defer func() { s.indexers = nil }()
// Initialize our in-memory indexes
memTxn := s.inmem.Txn(true)
defer memTxn.Abort()
err = s.db.View(func(dbTxn *bolt.Tx) error {
for _, indexer := range dbIndexers {
// TODO: this should use callIndexer but it's broken as it prevents the multiple op indexers
// from properly running.
if err := indexer(s, dbTxn, memTxn); err != nil {
return err
}
}
return nil
})
if err != nil {
log.Error("failed to generate in memory index", "error", err)
return nil, err
}
memTxn.Commit()
return s, nil
}
// callIndexer calls the specified indexer exactly once. If it has been called
// before this returns no error. This must not be called concurrently. This
// can be used from indexers to ensure other data is indexed first.
func (s *State) callIndexer(fn indexFn, dbTxn *bolt.Tx, memTxn *memdb.Txn) error {
fnptr := reflect.ValueOf(fn).Pointer()
if _, ok := s.indexers[fnptr]; ok {
return nil
}
s.indexers[fnptr] = struct{}{}
return fn(s, dbTxn, memTxn)
}
// Close should be called to gracefully close any resources.
func (s *State) Close() error {
return s.db.Close()
}
// schemaFn is an interface function used to create and return new memdb schema
// structs for constructing an in-memory db.
type schemaFn func() *memdb.TableSchema
// stateStoreSchema is used to return the combined schema for the state store.
func stateStoreSchema() *memdb.DBSchema {
// Create the root DB schema
db := &memdb.DBSchema{
Tables: make(map[string]*memdb.TableSchema),
}
// Add the tables to the root schema
for _, fn := range schemas {
schema := fn()
if _, ok := db.Tables[schema.Name]; ok {
panic(fmt.Sprintf("duplicate table name: %s", schema.Name))
}
db.Tables[schema.Name] = schema
}
return db
}
// indexFn is the function type for initializing in-memory indexes from
// persisted data. This is usually specified as a method handle to a
// *State method.
//
// The bolt.Tx is read-only while the memdb.Txn is a write transaction.
type indexFn func(*State, *bolt.Tx, *memdb.Txn) error
func (*State) newResourceId() (string, error) {
id, err := ulid.New(ulid.Timestamp(time.Now()), entropy)
if err != nil {
return "", err
}
return id.String(), nil
}