Seperate pruning jobs from memdb and bolt
only prune out old queued jobs in memdb
This commit is contained in:
parent
21c926523f
commit
2991810e0c
@ -18,8 +18,7 @@ func (s *service) runPrune(
|
||||
funclog.Info("starting")
|
||||
defer funclog.Info("exiting")
|
||||
|
||||
// tk := time.NewTicker(10 * time.Minute)
|
||||
tk := time.NewTicker(60000000000 / 10)
|
||||
tk := time.NewTicker(10 * time.Minute)
|
||||
defer tk.Stop()
|
||||
|
||||
for {
|
||||
|
||||
@ -8,7 +8,6 @@ import (
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/vagrant-plugin-sdk/proto/vagrant_plugin_sdk"
|
||||
"github.com/hashicorp/vagrant/internal/server"
|
||||
"github.com/hashicorp/vagrant/internal/server/proto/vagrant_server"
|
||||
"github.com/hashicorp/vagrant/internal/server/singleprocess/state"
|
||||
@ -35,9 +34,6 @@ type service struct {
|
||||
bgWg sync.WaitGroup
|
||||
|
||||
vagrant_server.UnimplementedVagrantServer
|
||||
vagrant_plugin_sdk.UnimplementedTargetServiceServer
|
||||
vagrant_plugin_sdk.UnimplementedProjectServiceServer
|
||||
vagrant_plugin_sdk.UnimplementedVagrantfileServiceServer
|
||||
}
|
||||
|
||||
// New returns a Vagrant server implementation that uses BoltDB plus
|
||||
|
||||
@ -33,7 +33,8 @@ const (
|
||||
jobStateIndexName = "state"
|
||||
jobQueueTimeIndexName = "queue-time"
|
||||
jobTargetIdIndexName = "target-id"
|
||||
maximumJobsIndexed = 1
|
||||
maximumJobsInMem = 10000
|
||||
maximumJobsIndexed = 10
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -921,6 +922,7 @@ func (s *State) jobCreate(dbTxn *bolt.Tx, memTxn *memdb.Txn, jobpb *vagrant_serv
|
||||
|
||||
s.pruneMu.Lock()
|
||||
defer s.pruneMu.Unlock()
|
||||
s.indexedJobs++
|
||||
|
||||
return err
|
||||
}
|
||||
@ -1025,15 +1027,14 @@ func (s *State) jobCandidateAny(memTxn *memdb.Txn, ws memdb.WatchSet, r *runnerR
|
||||
}
|
||||
|
||||
func (s *State) jobsPruneOld(memTxn *memdb.Txn, max int) (int, error) {
|
||||
cur := dbCount(s.db, jobTableName)
|
||||
// Prune from memdb
|
||||
return pruneOld(memTxn, pruneOp{
|
||||
lock: &s.pruneMu,
|
||||
table: jobTableName,
|
||||
index: jobQueueTimeIndexName,
|
||||
indexArgs: []interface{}{vagrant_server.Job_SUCCESS, time.Unix(0, 0)},
|
||||
indexArgs: []interface{}{vagrant_server.Job_QUEUED, time.Unix(0, 0)},
|
||||
max: max,
|
||||
// cur: &s.indexedJobs,
|
||||
cur: &cur,
|
||||
cur: &s.indexedJobs,
|
||||
check: func(raw interface{}) bool {
|
||||
job := raw.(*jobIndex)
|
||||
return !jobIsCompleted(job.State)
|
||||
@ -1041,6 +1042,40 @@ func (s *State) jobsPruneOld(memTxn *memdb.Txn, max int) (int, error) {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *State) jobsDBPruneOld(max int) (int, error) {
|
||||
cnt := dbCount(s.db, jobTableName)
|
||||
toDelete := cnt - max
|
||||
var deleted int
|
||||
|
||||
// Prune jobs from boltDB
|
||||
s.db.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(jobTableName))
|
||||
cur := bucket.Cursor()
|
||||
key, _ := cur.First()
|
||||
for {
|
||||
if key == nil {
|
||||
break
|
||||
}
|
||||
// otherwise, prune this job! Once we've pruned enough jobs to get back
|
||||
// to the maximum, we stop pruning.
|
||||
toDelete--
|
||||
|
||||
err := bucket.Delete(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
deleted++
|
||||
if toDelete <= 0 {
|
||||
break
|
||||
}
|
||||
key, _ = cur.Next()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return deleted, nil
|
||||
}
|
||||
|
||||
// Job returns the Job for an index.
|
||||
func (idx *jobIndex) Job(jobpb *vagrant_server.Job) *Job {
|
||||
return &Job{
|
||||
|
||||
@ -59,6 +59,9 @@ type State struct {
|
||||
// Where to log to
|
||||
log hclog.Logger
|
||||
|
||||
// indexedJobs indicates how many job records we are tracking in memory
|
||||
indexedJobs int
|
||||
|
||||
// Used to track prune records
|
||||
pruneMu sync.Mutex
|
||||
}
|
||||
@ -140,7 +143,8 @@ func (s *State) Prune() error {
|
||||
memTxn := s.inmem.Txn(true)
|
||||
defer memTxn.Abort()
|
||||
|
||||
jobs, err := s.jobsPruneOld(memTxn, maximumJobsIndexed)
|
||||
// Prune jobs from memdb
|
||||
jobs, err := s.jobsPruneOld(memTxn, maximumJobsInMem)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -149,7 +153,6 @@ func (s *State) Prune() error {
|
||||
)
|
||||
|
||||
memTxn.Commit()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user