diff --git a/internal/server/singleprocess/prune.go b/internal/server/singleprocess/prune.go index 1d3535108..220a7fe0f 100644 --- a/internal/server/singleprocess/prune.go +++ b/internal/server/singleprocess/prune.go @@ -18,8 +18,7 @@ func (s *service) runPrune( funclog.Info("starting") defer funclog.Info("exiting") - // tk := time.NewTicker(10 * time.Minute) - tk := time.NewTicker(60000000000 / 10) + tk := time.NewTicker(10 * time.Minute) defer tk.Stop() for { diff --git a/internal/server/singleprocess/service.go b/internal/server/singleprocess/service.go index 95975b0cb..019df3642 100644 --- a/internal/server/singleprocess/service.go +++ b/internal/server/singleprocess/service.go @@ -8,7 +8,6 @@ import ( "github.com/hashicorp/go-hclog" - "github.com/hashicorp/vagrant-plugin-sdk/proto/vagrant_plugin_sdk" "github.com/hashicorp/vagrant/internal/server" "github.com/hashicorp/vagrant/internal/server/proto/vagrant_server" "github.com/hashicorp/vagrant/internal/server/singleprocess/state" @@ -35,9 +34,6 @@ type service struct { bgWg sync.WaitGroup vagrant_server.UnimplementedVagrantServer - vagrant_plugin_sdk.UnimplementedTargetServiceServer - vagrant_plugin_sdk.UnimplementedProjectServiceServer - vagrant_plugin_sdk.UnimplementedVagrantfileServiceServer } // New returns a Vagrant server implementation that uses BoltDB plus diff --git a/internal/server/singleprocess/state/job.go b/internal/server/singleprocess/state/job.go index c44c10a0a..cc4ef53ad 100644 --- a/internal/server/singleprocess/state/job.go +++ b/internal/server/singleprocess/state/job.go @@ -33,7 +33,8 @@ const ( jobStateIndexName = "state" jobQueueTimeIndexName = "queue-time" jobTargetIdIndexName = "target-id" - maximumJobsIndexed = 1 + maximumJobsInMem = 10000 + maximumJobsIndexed = 10 ) func init() { @@ -921,6 +922,7 @@ func (s *State) jobCreate(dbTxn *bolt.Tx, memTxn *memdb.Txn, jobpb *vagrant_serv s.pruneMu.Lock() defer s.pruneMu.Unlock() + s.indexedJobs++ return err } @@ -1025,15 +1027,14 @@ func (s *State) jobCandidateAny(memTxn *memdb.Txn, ws memdb.WatchSet, r *runnerR } func (s *State) jobsPruneOld(memTxn *memdb.Txn, max int) (int, error) { - cur := dbCount(s.db, jobTableName) + // Prune from memdb return pruneOld(memTxn, pruneOp{ lock: &s.pruneMu, table: jobTableName, index: jobQueueTimeIndexName, - indexArgs: []interface{}{vagrant_server.Job_SUCCESS, time.Unix(0, 0)}, + indexArgs: []interface{}{vagrant_server.Job_QUEUED, time.Unix(0, 0)}, max: max, - // cur: &s.indexedJobs, - cur: &cur, + cur: &s.indexedJobs, check: func(raw interface{}) bool { job := raw.(*jobIndex) return !jobIsCompleted(job.State) @@ -1041,6 +1042,40 @@ func (s *State) jobsPruneOld(memTxn *memdb.Txn, max int) (int, error) { }) } +func (s *State) jobsDBPruneOld(max int) (int, error) { + cnt := dbCount(s.db, jobTableName) + toDelete := cnt - max + var deleted int + + // Prune jobs from boltDB + s.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(jobTableName)) + cur := bucket.Cursor() + key, _ := cur.First() + for { + if key == nil { + break + } + // otherwise, prune this job! Once we've pruned enough jobs to get back + // to the maximum, we stop pruning. + toDelete-- + + err := bucket.Delete(key) + if err != nil { + return err + } + + deleted++ + if toDelete <= 0 { + break + } + key, _ = cur.Next() + } + return nil + }) + return deleted, nil +} + // Job returns the Job for an index. func (idx *jobIndex) Job(jobpb *vagrant_server.Job) *Job { return &Job{ diff --git a/internal/server/singleprocess/state/state.go b/internal/server/singleprocess/state/state.go index 82f8bfe7b..7c8ab5193 100644 --- a/internal/server/singleprocess/state/state.go +++ b/internal/server/singleprocess/state/state.go @@ -59,6 +59,9 @@ type State struct { // Where to log to log hclog.Logger + // indexedJobs indicates how many job records we are tracking in memory + indexedJobs int + // Used to track prune records pruneMu sync.Mutex } @@ -140,7 +143,8 @@ func (s *State) Prune() error { memTxn := s.inmem.Txn(true) defer memTxn.Abort() - jobs, err := s.jobsPruneOld(memTxn, maximumJobsIndexed) + // Prune jobs from memdb + jobs, err := s.jobsPruneOld(memTxn, maximumJobsInMem) if err != nil { return err } @@ -149,7 +153,6 @@ func (s *State) Prune() error { ) memTxn.Commit() - return nil }