sophia 2021-09-17 13:46:00 -05:00 committed by Paul Hinze
parent f845222c44
commit 772e786eee
No known key found for this signature in database
GPG Key ID: B69DEDF2D55501C0
5 changed files with 371 additions and 0 deletions

View File

@ -0,0 +1,35 @@
package singleprocess
import (
"context"
"sync"
"time"
"github.com/hashicorp/go-hclog"
)
func (s *service) runPrune(
ctx context.Context,
wg *sync.WaitGroup,
funclog hclog.Logger,
) {
defer wg.Done()
funclog.Info("starting")
defer funclog.Info("exiting")
tk := time.NewTicker(10 * time.Minute)
defer tk.Stop()
for {
select {
case <-ctx.Done():
return
case <-tk.C:
err := s.state.Prune()
if err != nil {
funclog.Error("error pruning data", "error", err)
}
}
}
}

View File

@ -33,6 +33,7 @@ const (
jobStateIndexName = "state"
jobQueueTimeIndexName = "queue-time"
jobTargetIdIndexName = "target-id"
maximumJobsIndexed = 10000
)
func init() {

View File

@ -0,0 +1,99 @@
package state
import (
"sync"
validation "github.com/go-ozzo/ozzo-validation/v4"
"github.com/hashicorp/go-memdb"
)
type pruneOp struct {
lock *sync.Mutex
table, index string
indexArgs []interface{}
max int
cur *int
check func(val interface{}) bool
}
func (p *pruneOp) Validate() error {
return validation.ValidateStruct(p,
validation.Field(&p.lock, validation.Required),
validation.Field(&p.table, validation.Required),
validation.Field(&p.index, validation.Required),
validation.Field(&p.cur, validation.NilOrNotEmpty),
)
}
// pruneOld uses the types in op to scan the table indicated and prune old records.
// The op's check function can allow the process to skip records that shouldn't be
// pruned regardless of their age.
func pruneOld(memTxn *memdb.Txn, op pruneOp) (int, error) {
if err := op.Validate(); err != nil {
return 0, err
}
op.lock.Lock()
// Easy enough, just exit if we haven't hit the maximum
if *op.cur <= op.max {
op.lock.Unlock()
return 0, nil
}
// Calculate how many jobs we need to prune to get back to the maximum.
pruneCnt := *op.cur - op.max
// Unlock the prune lock for the bulk of the work so we don't prevent new work
// from starting while the prune is taking place.
op.lock.Unlock()
// Now we iterate the jobs, starting we the queue time that is furtherest in the past
// (ie, delete the oldest records first).
iter, err := memTxn.LowerBound(op.table, op.index, op.indexArgs...)
if err != nil {
return 0, err
}
// Track the total values deleted separately because we can exit early
// and so we might want to prune, say, 100 we might only be able to prune 50
// and need to know the exact number.
var deleted int
pruning:
for {
raw := iter.Next()
if raw == nil {
break
}
if op.check != nil && op.check(raw) {
continue pruning
}
// otherwise, prune this job! Once we've pruned enough jobs to get back
// to the maximum, we stop pruning.
pruneCnt--
err = memTxn.Delete(op.table, raw)
if err != nil {
return 0, err
}
deleted++
if pruneCnt <= 0 {
break
}
}
// Grab the lock and update indexedJobs value
op.lock.Lock()
defer op.lock.Unlock()
// We subject the diff here because while prune was running, new jobs
// can get scheduled and thusly we might not actually remove the same
// percentage of jobs as we expect.
*op.cur -= deleted
return deleted, nil
}

View File

@ -0,0 +1,217 @@
package state
import (
"sync"
"testing"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
)
type pti struct {
Id string
}
func TestPruneOld(t *testing.T) {
schema := &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{
"items": {
Name: "items",
Indexes: map[string]*memdb.IndexSchema{
jobIdIndexName: {
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Id",
},
},
},
},
},
}
t.Run("prunes none when there is enough room", func(t *testing.T) {
require := require.New(t)
inmem, err := memdb.NewMemDB(schema)
require.NoError(err)
txn := inmem.Txn(true)
defer txn.Abort()
require.NoError(txn.Insert("items", &pti{"B"}))
require.NoError(txn.Insert("items", &pti{"C"}))
require.NoError(txn.Insert("items", &pti{"A"}))
require.NoError(txn.Insert("items", &pti{"D"}))
// Weird order on purpose to validate lexical ordering on pruning.
txn.Commit()
txn = inmem.Txn(true)
defer txn.Abort()
var (
mu sync.Mutex
indexed int = 4
)
cnt, err := pruneOld(txn, pruneOp{
lock: &mu,
table: "items",
index: "id",
indexArgs: []interface{}{""},
cur: &indexed,
max: 4,
})
require.NoError(err)
txn.Commit()
require.Equal(0, cnt)
require.Equal(4, indexed)
txn = inmem.Txn(false)
defer txn.Abort()
val, err := txn.First("items", "id", "A")
require.NoError(err)
require.NotNil(val)
val, err = txn.First("items", "id", "B")
require.NoError(err)
require.NotNil(val)
val, err = txn.First("items", "id", "C")
require.NoError(err)
require.NotNil(val)
val, err = txn.First("items", "id", "D")
require.NoError(err)
require.NotNil(val)
})
t.Run("deletes a subset of records", func(t *testing.T) {
require := require.New(t)
inmem, err := memdb.NewMemDB(schema)
require.NoError(err)
txn := inmem.Txn(true)
defer txn.Abort()
require.NoError(txn.Insert("items", &pti{"B"}))
require.NoError(txn.Insert("items", &pti{"C"}))
require.NoError(txn.Insert("items", &pti{"A"}))
require.NoError(txn.Insert("items", &pti{"D"}))
// Weird order on purpose to validate lexical ordering on pruning.
txn.Commit()
txn = inmem.Txn(true)
defer txn.Abort()
var (
mu sync.Mutex
indexed int = 4
)
cnt, err := pruneOld(txn, pruneOp{
lock: &mu,
table: "items",
index: "id",
indexArgs: []interface{}{""},
cur: &indexed,
max: 2,
})
require.NoError(err)
txn.Commit()
require.Equal(2, cnt)
require.Equal(2, indexed)
txn = inmem.Txn(false)
defer txn.Abort()
val, err := txn.First("items", "id", "A")
require.NoError(err)
require.Nil(val)
val, err = txn.First("items", "id", "B")
require.NoError(err)
require.Nil(val)
val, err = txn.First("items", "id", "C")
require.NoError(err)
require.NotNil(val)
val, err = txn.First("items", "id", "D")
require.NoError(err)
require.NotNil(val)
})
t.Run("deletes all records", func(t *testing.T) {
require := require.New(t)
inmem, err := memdb.NewMemDB(schema)
require.NoError(err)
txn := inmem.Txn(true)
defer txn.Abort()
require.NoError(txn.Insert("items", &pti{"B"}))
require.NoError(txn.Insert("items", &pti{"C"}))
require.NoError(txn.Insert("items", &pti{"A"}))
require.NoError(txn.Insert("items", &pti{"D"}))
// Weird order on purpose to validate lexical ordering on pruning.
txn.Commit()
txn = inmem.Txn(true)
defer txn.Abort()
var (
mu sync.Mutex
indexed int = 4
)
cnt, err := pruneOld(txn, pruneOp{
lock: &mu,
table: "items",
index: "id",
indexArgs: []interface{}{""},
cur: &indexed,
max: 0,
})
require.NoError(err)
txn.Commit()
require.Equal(4, cnt)
require.Equal(0, indexed)
txn = inmem.Txn(false)
defer txn.Abort()
val, err := txn.First("items", "id", "A")
require.NoError(err)
require.Nil(val)
val, err = txn.First("items", "id", "B")
require.NoError(err)
require.Nil(val)
val, err = txn.First("items", "id", "C")
require.NoError(err)
require.Nil(val)
val, err = txn.First("items", "id", "D")
require.NoError(err)
require.Nil(val)
})
}

View File

@ -130,6 +130,25 @@ func (s *State) Close() error {
return s.db.Close()
}
// Prune should be called in a on a regular interval to allow State
// to prune out old data.
func (s *State) Prune() error {
memTxn := s.inmem.Txn(true)
defer memTxn.Abort()
jobs, err := s.jobsPruneOld(memTxn, maximumJobsIndexed)
if err != nil {
return err
}
s.log.Debug("Finished pruning data",
"removed-jobs", jobs,
)
memTxn.Commit()
return nil
}
// schemaFn is an interface function used to create and return new memdb schema
// structs for constructing an in-memory db.
type schemaFn func() *memdb.TableSchema