Attach functions to client instead of basis

This commit is contained in:
Chris Roberts 2021-08-05 11:24:38 -07:00 committed by Paul Hinze
parent 1c87679bc0
commit 07dbd15ea6
No known key found for this signature in database
GPG Key ID: B69DEDF2D55501C0
5 changed files with 93 additions and 179 deletions

View File

@ -15,12 +15,14 @@ import (
"github.com/hashicorp/vagrant/internal/server/proto/vagrant_server"
)
type JobModifier func(*vagrant_server.Job)
// job returns the basic job skeleton prepoulated with the correct
// defaults based on how the client is configured. For example, for local
// operations, this will already have the targeting for the local runner.
func (b *Basis) job() *vagrant_server.Job {
func (c *Client) job() *vagrant_server.Job {
job := &vagrant_server.Job{
TargetRunner: b.runner,
TargetRunner: c.runnerRef,
DataSource: &vagrant_server.Job_DataSource{
Source: &vagrant_server.Job_DataSource_Local{
@ -33,61 +35,28 @@ func (b *Basis) job() *vagrant_server.Job {
},
}
job.Basis = b.Ref()
if b.Project != nil {
job.Project = b.Project.Ref()
}
// If we're not local, we set a nil data source so it defaults to
// whatever the project has remotely.
if !b.local {
job.DataSource = nil
}
return job
}
// doJob will queue and execute the job. If the client is configured for
// local mode, this will start and target the proper runner.
func (b *Basis) doJob(
func (c *Client) doJob(
ctx context.Context,
job *vagrant_server.Job,
ui terminal.UI,
) (*vagrant_server.Job_Result, error) {
log := b.logger
log := c.logger
if ui == nil {
ui = b.UI()
ui = c.ui
}
// cb is used in local mode only to get a callback of the job ID
// so we can tell our runner what ID to expect.
var cb func(string)
// In local mode we have to start a runner.
if b.local {
if b.localRunner == nil {
log.Info("local mode, starting local runner")
r, err := b.startRunner()
if err != nil {
return nil, err
}
b.localRunner = r
b.cleanup(func() { r.Close() })
}
r := b.localRunner
log.Info("using local runner", "runner_id", r.Id())
// We defer the close so that we clean up resources. Local mode
// always blocks and streams the full output so when doJob exits
// the job is complete.
// defer r.Close()
// In local mode we need to setup our callback
if c.localRunner {
var jobCh chan struct{}
defer func() {
@ -104,46 +73,37 @@ func (b *Basis) doJob(
jobCh = make(chan struct{})
go func() {
defer close(jobCh)
if err := r.AcceptExact(ctx, id); err != nil {
if err := c.runner.AcceptExact(ctx, id); err != nil {
log.Error("runner job accept error", "err", err)
}
}()
}
// Modify the job to target this runner and use the local data source.
job.TargetRunner = &vagrant_server.Ref_Runner{
Target: &vagrant_server.Ref_Runner_Id{
Id: &vagrant_server.Ref_RunnerId{
Id: r.Id(),
},
},
}
}
return b.queueAndStreamJob(ctx, job, ui, cb)
return c.queueAndStreamJob(ctx, job, ui, cb)
}
// queueAndStreamJob will queue the job. If the client is configured to watch the job,
// it'll also stream the output to the configured UI.
func (b *Basis) queueAndStreamJob(
func (c *Client) queueAndStreamJob(
ctx context.Context,
job *vagrant_server.Job,
ui terminal.UI,
jobIdCallback func(string),
) (*vagrant_server.Job_Result, error) {
log := b.logger
log := c.logger
// When local, we set an expiration here in case we can't gracefully
// cancel in the event of an error. This will ensure that the jobs don't
// remain queued forever. This is only for local ops.
expiration := ""
if b.local {
if c.localRunner {
expiration = "30s"
}
// Queue the job
log.Debug("queueing job", "operation", fmt.Sprintf("%T", job.Operation))
queueResp, err := b.client.QueueJob(ctx, &vagrant_server.QueueJobRequest{
queueResp, err := c.client.QueueJob(ctx, &vagrant_server.QueueJobRequest{
Job: job,
ExpiresIn: expiration,
})
@ -159,7 +119,7 @@ func (b *Basis) queueAndStreamJob(
// Get the stream
log.Debug("opening job stream")
stream, err := b.client.GetJobStream(ctx, &vagrant_server.GetJobStreamRequest{
stream, err := c.client.GetJobStream(ctx, &vagrant_server.GetJobStreamRequest{
JobId: queueResp.JobId,
})
if err != nil {
@ -196,7 +156,7 @@ func (b *Basis) queueAndStreamJob(
steps = map[int32]*stepData{}
)
if b.local {
if c.localRunner {
defer func() {
// If we completed then do nothing, or if the context is still
// active since this means that we're not cancelled.
@ -208,7 +168,7 @@ func (b *Basis) queueAndStreamJob(
defer cancel()
log.Warn("canceling job")
_, err := b.client.CancelJob(ctx, &vagrant_server.CancelJobRequest{
_, err := c.client.CancelJob(ctx, &vagrant_server.CancelJobRequest{
JobId: queueResp.JobId,
})
if err != nil {
@ -252,7 +212,7 @@ func (b *Basis) queueAndStreamJob(
case *vagrant_server.GetJobStreamResponse_Terminal_:
// Ignore this for local jobs since we're using our UI directly.
if b.local {
if c.localRunner {
continue
}
@ -359,7 +319,7 @@ func (b *Basis) queueAndStreamJob(
step.Done()
}
default:
b.logger.Error("Unknown terminal event seen", "type", hclog.Fmt("%T", ev))
c.logger.Error("Unknown terminal event seen", "type", hclog.Fmt("%T", ev))
}
}
case *vagrant_server.GetJobStreamResponse_State_:

View File

@ -13,14 +13,14 @@ import (
// A noop operation will exercise the full logic of queueing a job,
// assigning it to a runner, dequeueing as a runner, executing, etc. It will
// use real remote runners if the client is configured to do so.
func (b *Basis) Noop(ctx context.Context) error {
func (c *Client) Noop(ctx context.Context) error {
// Build our job
job := b.job()
job := c.job()
job.Operation = &vagrant_server.Job_Noop_{
Noop: &vagrant_server.Job_Noop{},
}
// Execute it
_, err := b.doJob(ctx, job, nil)
_, err := c.doJob(ctx, job, c.ui)
return err
}

View File

@ -8,22 +8,26 @@ import (
"github.com/hashicorp/vagrant/internal/server/proto/vagrant_server"
)
func (b *Basis) Validate(
func (c *Client) Validate(
ctx context.Context,
op *vagrant_server.Job_ValidateOp,
mod JobModifier,
) (*vagrant_server.Job_ValidateResult, error) {
if op == nil {
op = &vagrant_server.Job_ValidateOp{}
}
// Validate our job
job := b.job()
job := c.job()
job.Operation = &vagrant_server.Job_Validate{
Validate: op,
}
if mod != nil {
mod(job)
}
// Execute it
result, err := b.doJob(ctx, job, nil)
result, err := c.doJob(ctx, job, c.ui)
if err != nil {
return nil, err
}
@ -31,20 +35,24 @@ func (b *Basis) Validate(
return result.Validate, nil
}
func (b *Basis) Commands(
func (c *Client) Commands(
ctx context.Context,
op *vagrant_server.Job_InitOp,
mod JobModifier,
) (*vagrant_server.Job_InitResult, error) {
if op == nil {
op = &vagrant_server.Job_InitOp{}
}
job := b.job()
job := c.job()
job.Operation = &vagrant_server.Job_Init{
Init: op,
}
if mod != nil {
mod(job)
}
result, err := b.doJob(ctx, job, nil)
result, err := c.doJob(ctx, job, c.ui)
if err != nil {
return nil, err
@ -53,101 +61,49 @@ func (b *Basis) Commands(
return result.Init, nil
}
func (p *Project) Commands(
ctx context.Context,
op *vagrant_server.Job_InitOp,
) (*vagrant_server.Job_InitResult, error) {
if op == nil {
op = &vagrant_server.Job_InitOp{}
}
job := p.job()
job.Operation = &vagrant_server.Job_Init{
Init: op,
}
result, err := p.doJob(ctx, job, nil)
if err != nil {
return nil, err
}
return result.Init, nil
}
func (b *Basis) Task(
func (c *Client) Task(
ctx context.Context,
op *vagrant_server.Job_RunOp,
mod JobModifier,
) (*vagrant_server.Job_RunResult, error) {
if op == nil {
op = &vagrant_server.Job_RunOp{}
}
job := b.job()
job := c.job()
job.Operation = &vagrant_server.Job_Run{
Run: op,
}
if mod != nil {
mod(job)
}
result, err := b.doJob(ctx, job, nil)
result, err := c.doJob(ctx, job, c.ui)
return result.Run, err
}
func (p *Project) Task(
ctx context.Context,
op *vagrant_server.Job_RunOp,
) (*vagrant_server.Job_RunResult, error) {
if op == nil {
op = &vagrant_server.Job_RunOp{}
}
job := p.job()
job.Operation = &vagrant_server.Job_Run{
Run: op,
}
result, err := p.doJob(ctx, job, nil)
return result.Run, err
}
func (m *Target) Task(
ctx context.Context,
op *vagrant_server.Job_RunOp,
) (*vagrant_server.Job_RunResult, error) {
if op == nil {
op = &vagrant_server.Job_RunOp{}
}
job := m.job()
job.Operation = &vagrant_server.Job_Run{
Run: op,
}
result, err := m.doJob(ctx, job)
if err != nil {
return nil, err
}
return result.Run, err
}
func (b *Basis) Auth(
func (c *Client) Auth(
ctx context.Context,
op *vagrant_server.Job_AuthOp,
mod JobModifier,
) (*vagrant_server.Job_AuthResult, error) {
if op == nil {
op = &vagrant_server.Job_AuthOp{}
}
// Auth our job
job := b.job()
job := c.job()
job.Operation = &vagrant_server.Job_Auth{
Auth: op,
}
if mod != nil {
mod(job)
}
// Execute it
result, err := b.doJob(ctx, job, nil)
result, err := c.doJob(ctx, job, c.ui)
if err != nil {
return nil, err
}
@ -155,21 +111,25 @@ func (b *Basis) Auth(
return result.Auth, nil
}
func (b *Basis) Docs(
func (c *Client) Docs(
ctx context.Context,
op *vagrant_server.Job_DocsOp,
mod JobModifier,
) (*vagrant_server.Job_DocsResult, error) {
if op == nil {
op = &vagrant_server.Job_DocsOp{}
}
job := b.job()
job := c.job()
job.Operation = &vagrant_server.Job_Docs{
Docs: op,
}
if mod != nil {
mod(job)
}
// Execute it
result, err := b.doJob(ctx, job, nil)
result, err := c.doJob(ctx, job, c.ui)
if err != nil {
return nil, err
}
@ -177,14 +137,15 @@ func (b *Basis) Docs(
return result.Docs, nil
}
func (b *Basis) Logs(ctx context.Context) (component.LogViewer, error) {
log := b.logger.Named("logs")
// TODO(spox): need to think about how to apply this
func (c *Client) Logs(ctx context.Context) (component.LogViewer, error) {
log := c.logger.Named("logs")
// First we attempt to query the server for logs for this deployment.
log.Info("requesting log stream")
client, err := b.client.GetLogStream(ctx, &vagrant_server.GetLogStreamRequest{
client, err := c.client.GetLogStream(ctx, &vagrant_server.GetLogStreamRequest{
Scope: &vagrant_server.GetLogStreamRequest_Basis{
Basis: b.Ref(),
// Basis: b.Ref(),
},
})
if err != nil {

View File

@ -6,14 +6,14 @@ import (
// startRunner initializes and starts a local runner. If the returned
// runner is non-nil, you must call Close on it to clean up resources properly.
func (b *Basis) startRunner() (*runner.Runner, error) {
func (c *Client) startRunner() (*runner.Runner, error) {
// Initialize our runner
r, err := runner.New(
runner.WithClient(b.client),
runner.WithVagrantRubyRuntime(b.vagrantRubyRuntime),
runner.WithLogger(b.logger),
runner.ByIdOnly(), // We'll direct target this
runner.WithLocal(b.UI()), // Local mode
runner.WithClient(c.client),
runner.WithVagrantRubyRuntime(c.rubyRuntime),
runner.WithLogger(c.logger),
runner.ByIdOnly(), // We'll direct target this
runner.WithLocal(c.ui), // Local mode
)
if err != nil {
return nil, err

View File

@ -34,12 +34,12 @@ import (
// 2. If WithLocal was specified and no connection addresses can be
// found, this will spin up an in-memory server.
//
func (b *Basis) initServerClient(ctx context.Context, cfg *config) (*grpc.ClientConn, error) {
log := b.logger.ResetNamed("vagrant.server")
func (c *Client) initServerClient(ctx context.Context, cfg *clientConfig) (*grpc.ClientConn, error) {
log := c.logger.ResetNamed("vagrant.server")
// If we're local, then connection is optional.
opts := cfg.connectOpts
if b.local {
if true { // c.localServer {
log.Trace("WithLocal set, server credentials optional")
opts = append(opts, serverclient.Optional())
}
@ -54,7 +54,7 @@ func (b *Basis) initServerClient(ctx context.Context, cfg *config) (*grpc.Client
// If we established a connection
if conn != nil {
log.Debug("connection established with sourced credentials")
b.cleanup(func() { conn.Close() })
c.Cleanup(func() error { return conn.Close() })
return conn, nil
}
@ -63,7 +63,7 @@ func (b *Basis) initServerClient(ctx context.Context, cfg *config) (*grpc.Client
// which is only possible if we configured this client to support local
// mode.
log.Info("no server credentials found, using in-memory local server")
return b.initLocalServer(ctx)
return c.initLocalServer(ctx)
}
// initLocalServer starts the local server and configures p.client to
@ -72,14 +72,14 @@ func (b *Basis) initServerClient(ctx context.Context, cfg *config) (*grpc.Client
//
// If this returns an error, all resources associated with this operation
// will be closed, but the project can retry.
func (b *Basis) initLocalServer(ctx context.Context) (_ *grpc.ClientConn, err error) {
log := b.logger.ResetNamed("vagrant.server")
b.localServer = true
func (c *Client) initLocalServer(ctx context.Context) (_ *grpc.ClientConn, err error) {
log := c.logger.ResetNamed("vagrant.server")
c.localServer = true
// We use this pointer to accumulate things we need to clean up
// in the case of an error. On success we nil this variable which
// doesn't close anything.
var cleanups []func()
var cleanups []func() error
// If we encounter an error force all the
// local cleanups to run
@ -102,7 +102,7 @@ func (b *Basis) initLocalServer(ctx context.Context) (_ *grpc.ClientConn, err er
if err != nil {
return
}
cleanups = append(cleanups, func() { db.Close() })
cleanups = append(cleanups, func() error { return db.Close() })
// Create our server
impl, err := singleprocess.New(
@ -120,7 +120,7 @@ func (b *Basis) initLocalServer(ctx context.Context) (_ *grpc.ClientConn, err er
if err != nil {
return
}
cleanups = append(cleanups, func() { ln.Close() })
cleanups = append(cleanups, func() error { return ln.Close() })
// Create a new cancellation context so we can cancel in the case of an error
ctx, cancel := context.WithCancel(ctx)
@ -161,20 +161,15 @@ func (b *Basis) initLocalServer(ctx context.Context) (_ *grpc.ClientConn, err er
}
// Have the defined cleanups run when the basis is closed
b.cleanup(func() {
for _, c := range cleanups {
c()
}
})
c.Cleanup(cleanups...)
_ = cancel // pacify vet lostcancel
return client.Conn(), nil
}
func (b *Basis) initVagrantRubyRuntime() (rubyRuntime plugin.ClientProtocol, err error) {
func (c *Client) initVagrantRubyRuntime() (rubyRuntime plugin.ClientProtocol, err error) {
// TODO: Update for actual release usage. This is dev only now.
// TODO: We should also locate a free port on startup and use that port
_, this_dir, _, _ := runtime.Caller(0)
cmd := exec.Command(
"bundle", "exec", "vagrant", "serve",
@ -186,34 +181,32 @@ func (b *Basis) initVagrantRubyRuntime() (rubyRuntime plugin.ClientProtocol, err
"VAGRANT_LOG_FILE=/tmp/vagrant.log",
}
config := serverclient.RubyVagrantPluginConfig(b.logger)
config := serverclient.RubyVagrantPluginConfig(c.logger)
config.Cmd = cmd
c := plugin.NewClient(config)
if _, err = c.Start(); err != nil {
rc := plugin.NewClient(config)
if _, err = rc.Start(); err != nil {
return
}
if rubyRuntime, err = c.Client(); err != nil {
if rubyRuntime, err = rc.Client(); err != nil {
return
}
// Ensure the plugin is halted when the basis is cleaned up
b.cleanup(func() { rubyRuntime.Close() })
c.Cleanup(func() error { return rubyRuntime.Close() })
return
}
// negotiateApiVersion negotiates the API version to use and validates
// that we are compatible to talk to the server.
func (b *Basis) negotiateApiVersion(ctx context.Context) error {
log := b.logger
log.Trace("requesting version info from server")
resp, err := b.client.GetVersionInfo(ctx, &empty.Empty{})
func (c *Client) negotiateApiVersion(ctx context.Context) error {
c.logger.Trace("requesting version info from server")
resp, err := c.client.GetVersionInfo(ctx, &empty.Empty{})
if err != nil {
return err
}
log.Info("server version info",
c.logger.Info("server version info",
"version", resp.Info.Version,
"api_min", resp.Info.Api.Minimum,
"api_current", resp.Info.Api.Current,
@ -226,6 +219,6 @@ func (b *Basis) negotiateApiVersion(ctx context.Context) error {
return err
}
log.Info("negotiated api version", "version", vsn)
c.logger.Info("negotiated api version", "version", vsn)
return nil
}