From 07dbd15ea68501dc9c8c182f55560cd735824bc9 Mon Sep 17 00:00:00 2001 From: Chris Roberts Date: Thu, 5 Aug 2021 11:24:38 -0700 Subject: [PATCH] Attach functions to client instead of basis --- internal/client/job.go | 80 ++++++----------------- internal/client/noop.go | 6 +- internal/client/operation.go | 121 ++++++++++++----------------------- internal/client/runner.go | 12 ++-- internal/client/server.go | 53 +++++++-------- 5 files changed, 93 insertions(+), 179 deletions(-) diff --git a/internal/client/job.go b/internal/client/job.go index 02144f179..b5abc706b 100644 --- a/internal/client/job.go +++ b/internal/client/job.go @@ -15,12 +15,14 @@ import ( "github.com/hashicorp/vagrant/internal/server/proto/vagrant_server" ) +type JobModifier func(*vagrant_server.Job) + // job returns the basic job skeleton prepoulated with the correct // defaults based on how the client is configured. For example, for local // operations, this will already have the targeting for the local runner. -func (b *Basis) job() *vagrant_server.Job { +func (c *Client) job() *vagrant_server.Job { job := &vagrant_server.Job{ - TargetRunner: b.runner, + TargetRunner: c.runnerRef, DataSource: &vagrant_server.Job_DataSource{ Source: &vagrant_server.Job_DataSource_Local{ @@ -33,61 +35,28 @@ func (b *Basis) job() *vagrant_server.Job { }, } - job.Basis = b.Ref() - if b.Project != nil { - job.Project = b.Project.Ref() - } - - // If we're not local, we set a nil data source so it defaults to - // whatever the project has remotely. - if !b.local { - job.DataSource = nil - } - return job } // doJob will queue and execute the job. If the client is configured for // local mode, this will start and target the proper runner. -func (b *Basis) doJob( +func (c *Client) doJob( ctx context.Context, job *vagrant_server.Job, ui terminal.UI, ) (*vagrant_server.Job_Result, error) { - log := b.logger + log := c.logger if ui == nil { - ui = b.UI() + ui = c.ui } // cb is used in local mode only to get a callback of the job ID // so we can tell our runner what ID to expect. var cb func(string) - // In local mode we have to start a runner. - if b.local { - - if b.localRunner == nil { - log.Info("local mode, starting local runner") - r, err := b.startRunner() - if err != nil { - return nil, err - } - - b.localRunner = r - b.cleanup(func() { r.Close() }) - } - - r := b.localRunner - - log.Info("using local runner", "runner_id", r.Id()) - - // We defer the close so that we clean up resources. Local mode - // always blocks and streams the full output so when doJob exits - // the job is complete. - - // defer r.Close() - + // In local mode we need to setup our callback + if c.localRunner { var jobCh chan struct{} defer func() { @@ -104,46 +73,37 @@ func (b *Basis) doJob( jobCh = make(chan struct{}) go func() { defer close(jobCh) - if err := r.AcceptExact(ctx, id); err != nil { + if err := c.runner.AcceptExact(ctx, id); err != nil { log.Error("runner job accept error", "err", err) } }() } - - // Modify the job to target this runner and use the local data source. - job.TargetRunner = &vagrant_server.Ref_Runner{ - Target: &vagrant_server.Ref_Runner_Id{ - Id: &vagrant_server.Ref_RunnerId{ - Id: r.Id(), - }, - }, - } } - return b.queueAndStreamJob(ctx, job, ui, cb) + return c.queueAndStreamJob(ctx, job, ui, cb) } // queueAndStreamJob will queue the job. If the client is configured to watch the job, // it'll also stream the output to the configured UI. -func (b *Basis) queueAndStreamJob( +func (c *Client) queueAndStreamJob( ctx context.Context, job *vagrant_server.Job, ui terminal.UI, jobIdCallback func(string), ) (*vagrant_server.Job_Result, error) { - log := b.logger + log := c.logger // When local, we set an expiration here in case we can't gracefully // cancel in the event of an error. This will ensure that the jobs don't // remain queued forever. This is only for local ops. expiration := "" - if b.local { + if c.localRunner { expiration = "30s" } // Queue the job log.Debug("queueing job", "operation", fmt.Sprintf("%T", job.Operation)) - queueResp, err := b.client.QueueJob(ctx, &vagrant_server.QueueJobRequest{ + queueResp, err := c.client.QueueJob(ctx, &vagrant_server.QueueJobRequest{ Job: job, ExpiresIn: expiration, }) @@ -159,7 +119,7 @@ func (b *Basis) queueAndStreamJob( // Get the stream log.Debug("opening job stream") - stream, err := b.client.GetJobStream(ctx, &vagrant_server.GetJobStreamRequest{ + stream, err := c.client.GetJobStream(ctx, &vagrant_server.GetJobStreamRequest{ JobId: queueResp.JobId, }) if err != nil { @@ -196,7 +156,7 @@ func (b *Basis) queueAndStreamJob( steps = map[int32]*stepData{} ) - if b.local { + if c.localRunner { defer func() { // If we completed then do nothing, or if the context is still // active since this means that we're not cancelled. @@ -208,7 +168,7 @@ func (b *Basis) queueAndStreamJob( defer cancel() log.Warn("canceling job") - _, err := b.client.CancelJob(ctx, &vagrant_server.CancelJobRequest{ + _, err := c.client.CancelJob(ctx, &vagrant_server.CancelJobRequest{ JobId: queueResp.JobId, }) if err != nil { @@ -252,7 +212,7 @@ func (b *Basis) queueAndStreamJob( case *vagrant_server.GetJobStreamResponse_Terminal_: // Ignore this for local jobs since we're using our UI directly. - if b.local { + if c.localRunner { continue } @@ -359,7 +319,7 @@ func (b *Basis) queueAndStreamJob( step.Done() } default: - b.logger.Error("Unknown terminal event seen", "type", hclog.Fmt("%T", ev)) + c.logger.Error("Unknown terminal event seen", "type", hclog.Fmt("%T", ev)) } } case *vagrant_server.GetJobStreamResponse_State_: diff --git a/internal/client/noop.go b/internal/client/noop.go index c183ee9e5..360c0bdeb 100644 --- a/internal/client/noop.go +++ b/internal/client/noop.go @@ -13,14 +13,14 @@ import ( // A noop operation will exercise the full logic of queueing a job, // assigning it to a runner, dequeueing as a runner, executing, etc. It will // use real remote runners if the client is configured to do so. -func (b *Basis) Noop(ctx context.Context) error { +func (c *Client) Noop(ctx context.Context) error { // Build our job - job := b.job() + job := c.job() job.Operation = &vagrant_server.Job_Noop_{ Noop: &vagrant_server.Job_Noop{}, } // Execute it - _, err := b.doJob(ctx, job, nil) + _, err := c.doJob(ctx, job, c.ui) return err } diff --git a/internal/client/operation.go b/internal/client/operation.go index 5b348c65f..cc1067b67 100644 --- a/internal/client/operation.go +++ b/internal/client/operation.go @@ -8,22 +8,26 @@ import ( "github.com/hashicorp/vagrant/internal/server/proto/vagrant_server" ) -func (b *Basis) Validate( +func (c *Client) Validate( ctx context.Context, op *vagrant_server.Job_ValidateOp, + mod JobModifier, ) (*vagrant_server.Job_ValidateResult, error) { if op == nil { op = &vagrant_server.Job_ValidateOp{} } // Validate our job - job := b.job() + job := c.job() job.Operation = &vagrant_server.Job_Validate{ Validate: op, } + if mod != nil { + mod(job) + } // Execute it - result, err := b.doJob(ctx, job, nil) + result, err := c.doJob(ctx, job, c.ui) if err != nil { return nil, err } @@ -31,20 +35,24 @@ func (b *Basis) Validate( return result.Validate, nil } -func (b *Basis) Commands( +func (c *Client) Commands( ctx context.Context, op *vagrant_server.Job_InitOp, + mod JobModifier, ) (*vagrant_server.Job_InitResult, error) { if op == nil { op = &vagrant_server.Job_InitOp{} } - job := b.job() + job := c.job() job.Operation = &vagrant_server.Job_Init{ Init: op, } + if mod != nil { + mod(job) + } - result, err := b.doJob(ctx, job, nil) + result, err := c.doJob(ctx, job, c.ui) if err != nil { return nil, err @@ -53,101 +61,49 @@ func (b *Basis) Commands( return result.Init, nil } -func (p *Project) Commands( - ctx context.Context, - op *vagrant_server.Job_InitOp, -) (*vagrant_server.Job_InitResult, error) { - if op == nil { - op = &vagrant_server.Job_InitOp{} - } - - job := p.job() - job.Operation = &vagrant_server.Job_Init{ - Init: op, - } - - result, err := p.doJob(ctx, job, nil) - - if err != nil { - return nil, err - } - - return result.Init, nil -} - -func (b *Basis) Task( +func (c *Client) Task( ctx context.Context, op *vagrant_server.Job_RunOp, + mod JobModifier, + ) (*vagrant_server.Job_RunResult, error) { if op == nil { op = &vagrant_server.Job_RunOp{} } - job := b.job() + job := c.job() job.Operation = &vagrant_server.Job_Run{ Run: op, } + if mod != nil { + mod(job) + } - result, err := b.doJob(ctx, job, nil) + result, err := c.doJob(ctx, job, c.ui) return result.Run, err } -func (p *Project) Task( - ctx context.Context, - op *vagrant_server.Job_RunOp, -) (*vagrant_server.Job_RunResult, error) { - if op == nil { - op = &vagrant_server.Job_RunOp{} - } - - job := p.job() - job.Operation = &vagrant_server.Job_Run{ - Run: op, - } - - result, err := p.doJob(ctx, job, nil) - - return result.Run, err -} - -func (m *Target) Task( - ctx context.Context, - op *vagrant_server.Job_RunOp, -) (*vagrant_server.Job_RunResult, error) { - if op == nil { - op = &vagrant_server.Job_RunOp{} - } - - job := m.job() - job.Operation = &vagrant_server.Job_Run{ - Run: op, - } - - result, err := m.doJob(ctx, job) - if err != nil { - return nil, err - } - - return result.Run, err -} - -func (b *Basis) Auth( +func (c *Client) Auth( ctx context.Context, op *vagrant_server.Job_AuthOp, + mod JobModifier, ) (*vagrant_server.Job_AuthResult, error) { if op == nil { op = &vagrant_server.Job_AuthOp{} } // Auth our job - job := b.job() + job := c.job() job.Operation = &vagrant_server.Job_Auth{ Auth: op, } + if mod != nil { + mod(job) + } // Execute it - result, err := b.doJob(ctx, job, nil) + result, err := c.doJob(ctx, job, c.ui) if err != nil { return nil, err } @@ -155,21 +111,25 @@ func (b *Basis) Auth( return result.Auth, nil } -func (b *Basis) Docs( +func (c *Client) Docs( ctx context.Context, op *vagrant_server.Job_DocsOp, + mod JobModifier, ) (*vagrant_server.Job_DocsResult, error) { if op == nil { op = &vagrant_server.Job_DocsOp{} } - job := b.job() + job := c.job() job.Operation = &vagrant_server.Job_Docs{ Docs: op, } + if mod != nil { + mod(job) + } // Execute it - result, err := b.doJob(ctx, job, nil) + result, err := c.doJob(ctx, job, c.ui) if err != nil { return nil, err } @@ -177,14 +137,15 @@ func (b *Basis) Docs( return result.Docs, nil } -func (b *Basis) Logs(ctx context.Context) (component.LogViewer, error) { - log := b.logger.Named("logs") +// TODO(spox): need to think about how to apply this +func (c *Client) Logs(ctx context.Context) (component.LogViewer, error) { + log := c.logger.Named("logs") // First we attempt to query the server for logs for this deployment. log.Info("requesting log stream") - client, err := b.client.GetLogStream(ctx, &vagrant_server.GetLogStreamRequest{ + client, err := c.client.GetLogStream(ctx, &vagrant_server.GetLogStreamRequest{ Scope: &vagrant_server.GetLogStreamRequest_Basis{ - Basis: b.Ref(), + // Basis: b.Ref(), }, }) if err != nil { diff --git a/internal/client/runner.go b/internal/client/runner.go index 340248177..abf22dc44 100644 --- a/internal/client/runner.go +++ b/internal/client/runner.go @@ -6,14 +6,14 @@ import ( // startRunner initializes and starts a local runner. If the returned // runner is non-nil, you must call Close on it to clean up resources properly. -func (b *Basis) startRunner() (*runner.Runner, error) { +func (c *Client) startRunner() (*runner.Runner, error) { // Initialize our runner r, err := runner.New( - runner.WithClient(b.client), - runner.WithVagrantRubyRuntime(b.vagrantRubyRuntime), - runner.WithLogger(b.logger), - runner.ByIdOnly(), // We'll direct target this - runner.WithLocal(b.UI()), // Local mode + runner.WithClient(c.client), + runner.WithVagrantRubyRuntime(c.rubyRuntime), + runner.WithLogger(c.logger), + runner.ByIdOnly(), // We'll direct target this + runner.WithLocal(c.ui), // Local mode ) if err != nil { return nil, err diff --git a/internal/client/server.go b/internal/client/server.go index 70c8bd57c..8c3d279de 100644 --- a/internal/client/server.go +++ b/internal/client/server.go @@ -34,12 +34,12 @@ import ( // 2. If WithLocal was specified and no connection addresses can be // found, this will spin up an in-memory server. // -func (b *Basis) initServerClient(ctx context.Context, cfg *config) (*grpc.ClientConn, error) { - log := b.logger.ResetNamed("vagrant.server") +func (c *Client) initServerClient(ctx context.Context, cfg *clientConfig) (*grpc.ClientConn, error) { + log := c.logger.ResetNamed("vagrant.server") // If we're local, then connection is optional. opts := cfg.connectOpts - if b.local { + if true { // c.localServer { log.Trace("WithLocal set, server credentials optional") opts = append(opts, serverclient.Optional()) } @@ -54,7 +54,7 @@ func (b *Basis) initServerClient(ctx context.Context, cfg *config) (*grpc.Client // If we established a connection if conn != nil { log.Debug("connection established with sourced credentials") - b.cleanup(func() { conn.Close() }) + c.Cleanup(func() error { return conn.Close() }) return conn, nil } @@ -63,7 +63,7 @@ func (b *Basis) initServerClient(ctx context.Context, cfg *config) (*grpc.Client // which is only possible if we configured this client to support local // mode. log.Info("no server credentials found, using in-memory local server") - return b.initLocalServer(ctx) + return c.initLocalServer(ctx) } // initLocalServer starts the local server and configures p.client to @@ -72,14 +72,14 @@ func (b *Basis) initServerClient(ctx context.Context, cfg *config) (*grpc.Client // // If this returns an error, all resources associated with this operation // will be closed, but the project can retry. -func (b *Basis) initLocalServer(ctx context.Context) (_ *grpc.ClientConn, err error) { - log := b.logger.ResetNamed("vagrant.server") - b.localServer = true +func (c *Client) initLocalServer(ctx context.Context) (_ *grpc.ClientConn, err error) { + log := c.logger.ResetNamed("vagrant.server") + c.localServer = true // We use this pointer to accumulate things we need to clean up // in the case of an error. On success we nil this variable which // doesn't close anything. - var cleanups []func() + var cleanups []func() error // If we encounter an error force all the // local cleanups to run @@ -102,7 +102,7 @@ func (b *Basis) initLocalServer(ctx context.Context) (_ *grpc.ClientConn, err er if err != nil { return } - cleanups = append(cleanups, func() { db.Close() }) + cleanups = append(cleanups, func() error { return db.Close() }) // Create our server impl, err := singleprocess.New( @@ -120,7 +120,7 @@ func (b *Basis) initLocalServer(ctx context.Context) (_ *grpc.ClientConn, err er if err != nil { return } - cleanups = append(cleanups, func() { ln.Close() }) + cleanups = append(cleanups, func() error { return ln.Close() }) // Create a new cancellation context so we can cancel in the case of an error ctx, cancel := context.WithCancel(ctx) @@ -161,20 +161,15 @@ func (b *Basis) initLocalServer(ctx context.Context) (_ *grpc.ClientConn, err er } // Have the defined cleanups run when the basis is closed - b.cleanup(func() { - for _, c := range cleanups { - c() - } - }) + c.Cleanup(cleanups...) _ = cancel // pacify vet lostcancel return client.Conn(), nil } -func (b *Basis) initVagrantRubyRuntime() (rubyRuntime plugin.ClientProtocol, err error) { +func (c *Client) initVagrantRubyRuntime() (rubyRuntime plugin.ClientProtocol, err error) { // TODO: Update for actual release usage. This is dev only now. - // TODO: We should also locate a free port on startup and use that port _, this_dir, _, _ := runtime.Caller(0) cmd := exec.Command( "bundle", "exec", "vagrant", "serve", @@ -186,34 +181,32 @@ func (b *Basis) initVagrantRubyRuntime() (rubyRuntime plugin.ClientProtocol, err "VAGRANT_LOG_FILE=/tmp/vagrant.log", } - config := serverclient.RubyVagrantPluginConfig(b.logger) + config := serverclient.RubyVagrantPluginConfig(c.logger) config.Cmd = cmd - c := plugin.NewClient(config) - if _, err = c.Start(); err != nil { + rc := plugin.NewClient(config) + if _, err = rc.Start(); err != nil { return } - if rubyRuntime, err = c.Client(); err != nil { + if rubyRuntime, err = rc.Client(); err != nil { return } // Ensure the plugin is halted when the basis is cleaned up - b.cleanup(func() { rubyRuntime.Close() }) + c.Cleanup(func() error { return rubyRuntime.Close() }) return } // negotiateApiVersion negotiates the API version to use and validates // that we are compatible to talk to the server. -func (b *Basis) negotiateApiVersion(ctx context.Context) error { - log := b.logger - - log.Trace("requesting version info from server") - resp, err := b.client.GetVersionInfo(ctx, &empty.Empty{}) +func (c *Client) negotiateApiVersion(ctx context.Context) error { + c.logger.Trace("requesting version info from server") + resp, err := c.client.GetVersionInfo(ctx, &empty.Empty{}) if err != nil { return err } - log.Info("server version info", + c.logger.Info("server version info", "version", resp.Info.Version, "api_min", resp.Info.Api.Minimum, "api_current", resp.Info.Api.Current, @@ -226,6 +219,6 @@ func (b *Basis) negotiateApiVersion(ctx context.Context) error { return err } - log.Info("negotiated api version", "version", vsn) + c.logger.Info("negotiated api version", "version", vsn) return nil }