313 lines
7.9 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package runner
import (
"context"
"errors"
"sync"
"sync/atomic"
"github.com/hashicorp/go-hclog"
plg "github.com/hashicorp/go-plugin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/vagrant-plugin-sdk/internal-shared/cleanup"
"github.com/hashicorp/vagrant-plugin-sdk/terminal"
intcfg "github.com/hashicorp/vagrant/internal/config"
"github.com/hashicorp/vagrant/internal/core"
"github.com/hashicorp/vagrant/internal/plugin"
"github.com/hashicorp/vagrant/internal/server"
"github.com/hashicorp/vagrant/internal/server/proto/vagrant_server"
"github.com/hashicorp/vagrant/internal/serverclient"
)
var ErrClosed = errors.New("runner is closed")
// Runners in Vagrant execute operations. These can be local (the CLI)
// or they can be remote (triggered by some webhook). In either case, they
// share this same underlying implementation.
//
// To use a runner:
//
// 1. Initialize it with New. This will setup some initial state but
// will not register with the server or run jobs.
//
// 2. Start the runner with "Start". This will register the runner and
// kick off some management goroutines. This will not execute any jobs.
//
// 3. Run a single job with "Accept". This is named to be similar to a
// network listener "accepting" a connection. This will request a single
// job from the Vagrant server, block until one is available, and execute
// it. Repeat this call for however many jobs you want to execute.
//
// 4. Clean up with "Close". This will gracefully exit the runner, waiting
// for any running jobs to finish.
//
type Runner struct {
id string
factory *core.Factory
logger hclog.Logger
client *serverclient.VagrantClient
cleanup cleanup.Cleanup
vagrantRubyRuntime plg.ClientProtocol
vagrantRubyClient *serverclient.RubyVagrantClient
builtinPlugins *plugin.Builtin
ctx context.Context
runner *vagrant_server.Runner
ui terminal.UI
local bool
tempDir string
closedVal int32
acceptWg sync.WaitGroup
plugins *plugin.Manager
// config is the current runner config.
config *vagrant_server.RunnerConfig
originalEnv []*vagrant_server.ConfigVar
// this is used for registering plugins to prevent performing the
// sequence for every operation
opConfig *intcfg.Config
// noopCh is used in tests only. This will cause any noop operations
// to block until this channel is closed.
noopCh <-chan struct{}
}
func (r *Runner) Closer(fn cleanup.CleanupFn) {
r.cleanup.Do(fn)
}
// New initializes a new runner.
//
// You must call Start to start the runner and register with the Vagrant
// server. See the Runner struct docs for more details.
func New(opts ...Option) (*Runner, error) {
// Create our ID
id, err := server.Id()
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to generate unique ID: %s", err)
}
// Our default runner
runner := &Runner{
id: id,
logger: hclog.L(),
cleanup: cleanup.New(),
ctx: context.Background(),
runner: &vagrant_server.Runner{Id: id},
opConfig: &intcfg.Config{},
}
// Build our config
var cfg config
for _, o := range opts {
err := o(runner, &cfg)
if err != nil {
return nil, err
}
}
runner.logger = runner.logger.ResetNamed("vagrant.runner")
if runner.plugins == nil {
runner.plugins = plugin.NewManager(
runner.ctx,
runner.vagrantRubyClient,
runner.logger.Named("plugin-manager"),
)
}
if err := runner.plugins.LoadBuiltins(); err != nil {
return nil, err
}
if err := runner.plugins.LoadLegacyPlugins(
runner.vagrantRubyClient, runner.vagrantRubyRuntime); err != nil {
return nil, err
}
// Setup our runner components list
for _, p := range runner.plugins.Plugins {
for _, t := range p.Types {
runner.runner.Components = append(runner.runner.Components,
&vagrant_server.Component{
Type: vagrant_server.Component_Type(t),
Name: t.String(),
},
)
}
}
// Add a core factory
runner.factory = core.NewFactory(
runner.ctx,
runner.client,
runner.logger,
runner.plugins,
runner.ui,
)
runner.cleanup.Prepend(func() error { return runner.factory.Close() })
return runner, nil
}
// Id returns the runner ID.
func (r *Runner) Id() string {
return r.id
}
// Start starts the runner by registering the runner with the Vagrant
// server. This will spawn goroutines for management. This will return after
// registration so this should not be executed in a goroutine.
func (r *Runner) Start() error {
if r.closed() {
return ErrClosed
}
log := r.logger
// Register
log.Debug("registering runner")
client, err := r.client.RunnerConfig(r.ctx)
if err != nil {
return err
}
r.Closer(func() error { return client.CloseSend() })
// Send request
if err := client.Send(&vagrant_server.RunnerConfigRequest{
Event: &vagrant_server.RunnerConfigRequest_Open_{
Open: &vagrant_server.RunnerConfigRequest_Open{
Runner: r.runner,
},
},
}); err != nil {
return err
}
// Wait for an initial config as confirmation we're registered.
log.Trace("runner connected, waiting for initial config")
resp, err := client.Recv()
if err != nil {
return err
}
// Handle the first config so our initial setup is done
r.handleConfig(resp.Config)
// Start the watcher
ch := make(chan *vagrant_server.RunnerConfig)
go r.watchConfig(ch)
// Start the goroutine that waits for all other configs
go r.recvConfig(r.ctx, client, ch)
log.Info("runner registered with server")
return nil
}
// Close gracefully exits the runner. This will wait for any pending
// job executions to complete and then deregister the runner. After
// this is called, Start and Accept will no longer function and will
// return errors immediately.
func (r *Runner) Close() error {
// If we can't swap, we're already closed.
if !atomic.CompareAndSwapInt32(&r.closedVal, 0, 1) {
return nil
}
// Wait for our jobs to complete
r.acceptWg.Wait()
r.logger.Info("closing down the runner")
// Run any cleanup necessary
err := r.cleanup.Close()
if r.builtinPlugins != nil {
r.builtinPlugins.Close()
}
r.logger.Info("closing of runner is complete!")
return err
}
func (r *Runner) closed() bool {
return atomic.LoadInt32(&r.closedVal) > 0
}
type config struct{}
type Option func(*Runner, *config) error
// WithClient sets the client directly. In this case, the runner won't
// attempt any connection at all regardless of other configuration (env
// vars or vagrant config file). This will be used.
func WithClient(client *serverclient.VagrantClient) Option {
return func(r *Runner, cfg *config) error {
r.client = client
return nil
}
}
func WithVagrantRubyRuntime(vrr plg.ClientProtocol) Option {
return func(r *Runner, cfg *config) error {
r.vagrantRubyRuntime = vrr
raw, err := vrr.Dispense("vagrantrubyruntime")
if err != nil {
return err
}
rvc, ok := raw.(serverclient.RubyVagrantClient)
if !ok {
panic("failed to dispense RubyVagrantClient")
}
r.vagrantRubyClient = &rvc
return nil
}
}
// WithLogger sets the logger that the runner will use. If this isn't
// set it uses hclog.L().
func WithLogger(logger hclog.Logger) Option {
return func(r *Runner, cfg *config) error {
r.logger = logger
return nil
}
}
// WithLocal sets the runner to local mode. This only changes the UI
// behavior to use the given UI. If ui is nil then the normal streamed
// UI will be used.
func WithLocal(ui terminal.UI) Option {
return func(r *Runner, cfg *config) error {
r.local = true
r.ui = ui
return nil
}
}
// ByIdOnly sets it so that only jobs that target this runner by specific
// ID may be assigned.
func ByIdOnly() Option {
return func(r *Runner, cfg *config) error {
r.runner.ByIdOnly = true
return nil
}
}
func WithContext(ctx context.Context) Option {
return func(r *Runner, cfg *config) error {
r.ctx = ctx
return nil
}
}