From 772e786eee2d476b1785e6b580791d5ab40d2514 Mon Sep 17 00:00:00 2001 From: sophia Date: Fri, 17 Sep 2021 13:46:00 -0500 Subject: [PATCH] Import Waypoint state pruning code ref: https://github.com/hashicorp/waypoint/blob/b333b059a0473b81dd1158390ac054e857a39bb7/internal/server/singleprocess/state/prune.go https://github.com/hashicorp/waypoint/blob/b333b059a0473b81dd1158390ac054e857a39bb7/internal/server/singleprocess/state/prune_test.go https://github.com/hashicorp/waypoint/blob/4f7174861053bb0bf51a17a26d56120867c5b7b1/internal/server/singleprocess/prune.go Squash: import prune --- internal/server/singleprocess/prune.go | 35 +++ internal/server/singleprocess/state/job.go | 1 + internal/server/singleprocess/state/prune.go | 99 ++++++++ .../server/singleprocess/state/prune_test.go | 217 ++++++++++++++++++ internal/server/singleprocess/state/state.go | 19 ++ 5 files changed, 371 insertions(+) create mode 100644 internal/server/singleprocess/prune.go create mode 100644 internal/server/singleprocess/state/prune.go create mode 100644 internal/server/singleprocess/state/prune_test.go diff --git a/internal/server/singleprocess/prune.go b/internal/server/singleprocess/prune.go new file mode 100644 index 000000000..220a7fe0f --- /dev/null +++ b/internal/server/singleprocess/prune.go @@ -0,0 +1,35 @@ +package singleprocess + +import ( + "context" + "sync" + "time" + + "github.com/hashicorp/go-hclog" +) + +func (s *service) runPrune( + ctx context.Context, + wg *sync.WaitGroup, + funclog hclog.Logger, +) { + defer wg.Done() + + funclog.Info("starting") + defer funclog.Info("exiting") + + tk := time.NewTicker(10 * time.Minute) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + err := s.state.Prune() + if err != nil { + funclog.Error("error pruning data", "error", err) + } + } + } +} diff --git a/internal/server/singleprocess/state/job.go b/internal/server/singleprocess/state/job.go index b795e6e80..d1f5fb56a 100644 --- a/internal/server/singleprocess/state/job.go +++ b/internal/server/singleprocess/state/job.go @@ -33,6 +33,7 @@ const ( jobStateIndexName = "state" jobQueueTimeIndexName = "queue-time" jobTargetIdIndexName = "target-id" + maximumJobsIndexed = 10000 ) func init() { diff --git a/internal/server/singleprocess/state/prune.go b/internal/server/singleprocess/state/prune.go new file mode 100644 index 000000000..7e74e1f45 --- /dev/null +++ b/internal/server/singleprocess/state/prune.go @@ -0,0 +1,99 @@ +package state + +import ( + "sync" + + validation "github.com/go-ozzo/ozzo-validation/v4" + "github.com/hashicorp/go-memdb" +) + +type pruneOp struct { + lock *sync.Mutex + table, index string + indexArgs []interface{} + max int + cur *int + check func(val interface{}) bool +} + +func (p *pruneOp) Validate() error { + return validation.ValidateStruct(p, + validation.Field(&p.lock, validation.Required), + validation.Field(&p.table, validation.Required), + validation.Field(&p.index, validation.Required), + validation.Field(&p.cur, validation.NilOrNotEmpty), + ) +} + +// pruneOld uses the types in op to scan the table indicated and prune old records. +// The op's check function can allow the process to skip records that shouldn't be +// pruned regardless of their age. +func pruneOld(memTxn *memdb.Txn, op pruneOp) (int, error) { + if err := op.Validate(); err != nil { + return 0, err + } + + op.lock.Lock() + + // Easy enough, just exit if we haven't hit the maximum + if *op.cur <= op.max { + op.lock.Unlock() + return 0, nil + } + + // Calculate how many jobs we need to prune to get back to the maximum. + pruneCnt := *op.cur - op.max + + // Unlock the prune lock for the bulk of the work so we don't prevent new work + // from starting while the prune is taking place. + op.lock.Unlock() + + // Now we iterate the jobs, starting we the queue time that is furtherest in the past + // (ie, delete the oldest records first). + iter, err := memTxn.LowerBound(op.table, op.index, op.indexArgs...) + if err != nil { + return 0, err + } + + // Track the total values deleted separately because we can exit early + // and so we might want to prune, say, 100 we might only be able to prune 50 + // and need to know the exact number. + var deleted int + +pruning: + for { + raw := iter.Next() + if raw == nil { + break + } + + if op.check != nil && op.check(raw) { + continue pruning + } + + // otherwise, prune this job! Once we've pruned enough jobs to get back + // to the maximum, we stop pruning. + pruneCnt-- + + err = memTxn.Delete(op.table, raw) + if err != nil { + return 0, err + } + + deleted++ + if pruneCnt <= 0 { + break + } + } + + // Grab the lock and update indexedJobs value + op.lock.Lock() + defer op.lock.Unlock() + + // We subject the diff here because while prune was running, new jobs + // can get scheduled and thusly we might not actually remove the same + // percentage of jobs as we expect. + *op.cur -= deleted + + return deleted, nil +} diff --git a/internal/server/singleprocess/state/prune_test.go b/internal/server/singleprocess/state/prune_test.go new file mode 100644 index 000000000..961be5148 --- /dev/null +++ b/internal/server/singleprocess/state/prune_test.go @@ -0,0 +1,217 @@ +package state + +import ( + "sync" + "testing" + + "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/require" +) + +type pti struct { + Id string +} + +func TestPruneOld(t *testing.T) { + schema := &memdb.DBSchema{ + Tables: map[string]*memdb.TableSchema{ + "items": { + Name: "items", + Indexes: map[string]*memdb.IndexSchema{ + jobIdIndexName: { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "Id", + }, + }, + }, + }, + }, + } + + t.Run("prunes none when there is enough room", func(t *testing.T) { + require := require.New(t) + + inmem, err := memdb.NewMemDB(schema) + require.NoError(err) + + txn := inmem.Txn(true) + defer txn.Abort() + + require.NoError(txn.Insert("items", &pti{"B"})) + require.NoError(txn.Insert("items", &pti{"C"})) + require.NoError(txn.Insert("items", &pti{"A"})) + require.NoError(txn.Insert("items", &pti{"D"})) + + // Weird order on purpose to validate lexical ordering on pruning. + + txn.Commit() + + txn = inmem.Txn(true) + defer txn.Abort() + + var ( + mu sync.Mutex + indexed int = 4 + ) + + cnt, err := pruneOld(txn, pruneOp{ + lock: &mu, + table: "items", + index: "id", + indexArgs: []interface{}{""}, + cur: &indexed, + max: 4, + }) + require.NoError(err) + + txn.Commit() + + require.Equal(0, cnt) + require.Equal(4, indexed) + + txn = inmem.Txn(false) + defer txn.Abort() + + val, err := txn.First("items", "id", "A") + require.NoError(err) + require.NotNil(val) + + val, err = txn.First("items", "id", "B") + require.NoError(err) + require.NotNil(val) + + val, err = txn.First("items", "id", "C") + require.NoError(err) + require.NotNil(val) + + val, err = txn.First("items", "id", "D") + require.NoError(err) + require.NotNil(val) + }) + + t.Run("deletes a subset of records", func(t *testing.T) { + require := require.New(t) + + inmem, err := memdb.NewMemDB(schema) + require.NoError(err) + + txn := inmem.Txn(true) + defer txn.Abort() + + require.NoError(txn.Insert("items", &pti{"B"})) + require.NoError(txn.Insert("items", &pti{"C"})) + require.NoError(txn.Insert("items", &pti{"A"})) + require.NoError(txn.Insert("items", &pti{"D"})) + + // Weird order on purpose to validate lexical ordering on pruning. + + txn.Commit() + + txn = inmem.Txn(true) + defer txn.Abort() + + var ( + mu sync.Mutex + indexed int = 4 + ) + + cnt, err := pruneOld(txn, pruneOp{ + lock: &mu, + table: "items", + index: "id", + indexArgs: []interface{}{""}, + cur: &indexed, + max: 2, + }) + require.NoError(err) + + txn.Commit() + + require.Equal(2, cnt) + require.Equal(2, indexed) + + txn = inmem.Txn(false) + defer txn.Abort() + + val, err := txn.First("items", "id", "A") + require.NoError(err) + require.Nil(val) + + val, err = txn.First("items", "id", "B") + require.NoError(err) + require.Nil(val) + + val, err = txn.First("items", "id", "C") + require.NoError(err) + require.NotNil(val) + + val, err = txn.First("items", "id", "D") + require.NoError(err) + require.NotNil(val) + }) + + t.Run("deletes all records", func(t *testing.T) { + require := require.New(t) + + inmem, err := memdb.NewMemDB(schema) + require.NoError(err) + + txn := inmem.Txn(true) + defer txn.Abort() + + require.NoError(txn.Insert("items", &pti{"B"})) + require.NoError(txn.Insert("items", &pti{"C"})) + require.NoError(txn.Insert("items", &pti{"A"})) + require.NoError(txn.Insert("items", &pti{"D"})) + + // Weird order on purpose to validate lexical ordering on pruning. + + txn.Commit() + + txn = inmem.Txn(true) + defer txn.Abort() + + var ( + mu sync.Mutex + indexed int = 4 + ) + + cnt, err := pruneOld(txn, pruneOp{ + lock: &mu, + table: "items", + index: "id", + indexArgs: []interface{}{""}, + cur: &indexed, + max: 0, + }) + require.NoError(err) + + txn.Commit() + + require.Equal(4, cnt) + require.Equal(0, indexed) + + txn = inmem.Txn(false) + defer txn.Abort() + + val, err := txn.First("items", "id", "A") + require.NoError(err) + require.Nil(val) + + val, err = txn.First("items", "id", "B") + require.NoError(err) + require.Nil(val) + + val, err = txn.First("items", "id", "C") + require.NoError(err) + require.Nil(val) + + val, err = txn.First("items", "id", "D") + require.NoError(err) + require.Nil(val) + }) + +} diff --git a/internal/server/singleprocess/state/state.go b/internal/server/singleprocess/state/state.go index f77403ed1..2f5f6509b 100644 --- a/internal/server/singleprocess/state/state.go +++ b/internal/server/singleprocess/state/state.go @@ -130,6 +130,25 @@ func (s *State) Close() error { return s.db.Close() } +// Prune should be called in a on a regular interval to allow State +// to prune out old data. +func (s *State) Prune() error { + memTxn := s.inmem.Txn(true) + defer memTxn.Abort() + + jobs, err := s.jobsPruneOld(memTxn, maximumJobsIndexed) + if err != nil { + return err + } + s.log.Debug("Finished pruning data", + "removed-jobs", jobs, + ) + + memTxn.Commit() + + return nil +} + // schemaFn is an interface function used to create and return new memdb schema // structs for constructing an in-memory db. type schemaFn func() *memdb.TableSchema