Save point: run prune in background

This commit is contained in:
sophia 2021-09-17 17:10:20 -05:00 committed by Paul Hinze
parent 4205d0e444
commit 63dcc9fbf3
No known key found for this signature in database
GPG Key ID: B69DEDF2D55501C0
3 changed files with 32 additions and 3 deletions

View File

@ -18,7 +18,8 @@ func (s *service) runPrune(
funclog.Info("starting")
defer funclog.Info("exiting")
tk := time.NewTicker(10 * time.Minute)
// tk := time.NewTicker(10 * time.Minute)
tk := time.NewTicker(60000000000 / 10)
defer tk.Stop()
for {

View File

@ -1,6 +1,9 @@
package singleprocess
import (
"context"
"sync"
bolt "go.etcd.io/bbolt"
"github.com/hashicorp/go-hclog"
@ -21,6 +24,16 @@ type service struct {
// id is our unique server ID.
id string
// bgCtx is used for background tasks within the service. This is
// cancelled when Close is called.
bgCtx context.Context
bgCtxCancel context.CancelFunc
// bgWg is incremented for every background goroutine that the
// service starts up. When Close is called, we wait on this to ensure
// that we fully shut down before returning.
bgWg sync.WaitGroup
vagrant_server.UnimplementedVagrantServer
vagrant_plugin_sdk.UnimplementedTargetServiceServer
vagrant_plugin_sdk.UnimplementedProjectServiceServer
@ -86,6 +99,14 @@ func New(opts ...Option) (vagrant_server.VagrantServer, error) {
}
}
// Setup the background context that is used for internal tasks
s.bgCtx, s.bgCtxCancel = context.WithCancel(context.Background())
// Start out state pruning background goroutine. This calls
// Prune on the state every 10 minutes.
s.bgWg.Add(1)
go s.runPrune(s.bgCtx, &s.bgWg, log.Named("prune"))
return &s, nil
}

View File

@ -33,7 +33,7 @@ const (
jobStateIndexName = "state"
jobQueueTimeIndexName = "queue-time"
jobTargetIdIndexName = "target-id"
maximumJobsIndexed = 10000
maximumJobsIndexed = 1
)
func init() {
@ -918,6 +918,11 @@ func (s *State) jobCreate(dbTxn *bolt.Tx, memTxn *memdb.Txn, jobpb *vagrant_serv
// Insert into the DB
_, err = s.jobIndexSet(memTxn, id, jobpb)
s.pruneMu.Lock()
defer s.pruneMu.Unlock()
s.indexedJobs++
return err
}
@ -1021,13 +1026,15 @@ func (s *State) jobCandidateAny(memTxn *memdb.Txn, ws memdb.WatchSet, r *runnerR
}
func (s *State) jobsPruneOld(memTxn *memdb.Txn, max int) (int, error) {
// c := 8
return pruneOld(memTxn, pruneOp{
lock: &s.pruneMu,
table: jobTableName,
index: jobQueueTimeIndexName,
indexArgs: []interface{}{vagrant_server.Job_QUEUED, time.Unix(0, 0)},
indexArgs: []interface{}{vagrant_server.Job_SUCCESS, time.Unix(0, 0)},
max: max,
cur: &s.indexedJobs,
// cur: &c,
check: func(raw interface{}) bool {
job := raw.(*jobIndex)
return !jobIsCompleted(job.State)