From c1da0ab1b47654065a9b27e011f1a2fd231a9afd Mon Sep 17 00:00:00 2001 From: Chris Roberts Date: Mon, 17 May 2021 16:40:11 -0700 Subject: [PATCH] Target updates, labels removal, and broker addition for creating core plugins --- internal/core/basis.go | 83 ++++++++++++++++---------------- internal/core/hook.go | 8 +++- internal/core/operation.go | 33 +------------ internal/core/project.go | 97 +++++++++++++++++++++++++++++--------- internal/core/target.go | 66 ++++++++++++++++++++------ 5 files changed, 174 insertions(+), 113 deletions(-) diff --git a/internal/core/basis.go b/internal/core/basis.go index 3b167ec4a..7146e426a 100644 --- a/internal/core/basis.go +++ b/internal/core/basis.go @@ -12,12 +12,12 @@ import ( "github.com/hashicorp/go-argmapper" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-multierror" - "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/hashicorp/vagrant-plugin-sdk/component" "github.com/hashicorp/vagrant-plugin-sdk/datadir" + "github.com/hashicorp/vagrant-plugin-sdk/internal-shared/plugincore" "github.com/hashicorp/vagrant-plugin-sdk/internal-shared/protomappers" "github.com/hashicorp/vagrant-plugin-sdk/proto/vagrant_plugin_sdk" "github.com/hashicorp/vagrant-plugin-sdk/terminal" @@ -48,20 +48,9 @@ type Basis struct { lock sync.Mutex client *serverclient.VagrantClient - // NOTE: we can't have a broker to easily just shove at the - // mappers to help us. instead, lets update our scope defs - // in the proto Args so they can be a stream id (when being - // wrapped/passed by a plugin) or a connection end point. This - // will let us serve it from here without a new process being - // spun out, and we can wrap as needed as it gets shuttled around. - // Will just need a direct connection end point like we had for the - // ruby side before the broker there, and the mapper can just use - // what ever is available when doing setup. - grpcServer *grpc.Server - jobInfo *component.JobInfo closers []func() error - UI terminal.UI + ui terminal.UI } // NewBasis creates a new Basis with the given options. @@ -71,6 +60,7 @@ func NewBasis(ctx context.Context, opts ...BasisOption) (b *Basis, err error) { logger: hclog.L(), jobInfo: &component.JobInfo{}, factories: plugin.BaseFactories, + projects: map[string]*Project{}, } for _, opt := range opts { @@ -99,8 +89,8 @@ func NewBasis(ctx context.Context, opts ...BasisOption) (b *Basis, err error) { } // If no UI was provided, initialize a console UI - if b.UI == nil { - b.UI = terminal.ConsoleUI(ctx) + if b.ui == nil { + b.ui = terminal.ConsoleUI(ctx) } // If the mappers aren't already set, load known mappers @@ -119,29 +109,25 @@ func NewBasis(ctx context.Context, opts ...BasisOption) (b *Basis, err error) { // configuration loading if b.config == nil { if b.config, err = config.Load("", ""); err != nil { - return + b.logger.Warn("failed to load config, using stub", "error", err) + b.config = &config.Config{} + err = nil } } - // Finally, init a server to make this basis available - b.grpcServer = grpc.NewServer([]grpc.ServerOption{}...) - vagrant_plugin_sdk.RegisterTargetServiceServer(b.grpcServer, b) - - // Close down the local server when finished with the basis - b.Closer(func() error { - b.grpcServer.GracefulStop() - return nil - }) - // Ensure any modifications to the basis are persisted - b.Closer(func() error { b.Save() }) + b.Closer(func() error { return b.Save() }) b.logger.Info("basis initialized") return } -func (b *Basis) Ui() terminal.UI { - return b.UI +func (b *Basis) UI() (terminal.UI, error) { + return b.ui, nil +} + +func (b *Basis) DataDir() (*datadir.Basis, error) { + return b.dir, nil } func (b *Basis) Ref() interface{} { @@ -242,7 +228,7 @@ func (b *Basis) LoadProject(popts ...ProjectOption) (p *Project, err error) { mappers: b.mappers, factories: b.factories, targets: map[string]*Target{}, - UI: b.UI, + ui: b.ui, } // Apply any options provided @@ -268,16 +254,6 @@ func (b *Basis) LoadProject(popts ...ProjectOption) (p *Project, err error) { } } - // Init server to make this project available - p.grpcServer = grpc.NewServer() - vagrant_plugin_sdk.RegisterProjectServiceServer(p.grpcServer, p) - - // Close down the local server when complete - p.Closer(func() error { - p.grpcServer.GracefulStop() - return nil - }) - // Ensure any modifications to the project are persisted p.Closer(func() error { return p.Save() }) @@ -318,7 +294,7 @@ func (b *Basis) Close() (err error) { // Saves the basis to the db func (b *Basis) Save() (err error) { b.logger.Debug("saving basis to db", "basis", b.ResourceId()) - _, err := b.Client().UpsertBasis(b.ctx, &vagrant_server.UpsertBasisRequest{ + _, err = b.Client().UpsertBasis(b.ctx, &vagrant_server.UpsertBasisRequest{ Basis: b.basis}) if err != nil { b.logger.Trace("failed to save basis", "basis", b.ResourceId(), "error", err) @@ -370,15 +346,33 @@ func (b *Basis) Components(ctx context.Context) ([]*Component, error) { func (b *Basis) Run(ctx context.Context, task *vagrant_server.Task) (err error) { b.logger.Debug("running new task", "basis", b, "task", task) + // Build the component to run cmd, err := b.component(ctx, component.CommandType, task.Component.Name) if err != nil { return err } + // Specialize it if required if _, err = b.specializeComponent(cmd); err != nil { return } + // Pass along to the call + basis := plugincore.NewBasisPlugin(b, b.logger) + streamId, err := wrapInstance(basis, cmd.plugin.Broker, b) + bproto := &vagrant_plugin_sdk.Args_Basis{StreamId: streamId} + + // NOTE(spox): Should this be closed after the dynamic func + // call is complete, or when we tear this down? The latter + // would allow plugins to keep a persistent connection if + // multiple things are being run + b.Closer(func() error { + if c, ok := basis.(closes); ok { + return c.Close() + } + return nil + }) + result, err := b.callDynamicFunc( ctx, b.logger, @@ -386,6 +380,8 @@ func (b *Basis) Run(ctx context.Context, task *vagrant_server.Task) (err error) cmd, cmd.Value.(component.Command).ExecuteFunc(strings.Split(task.CommandName, " ")), argmapper.Typed(task.CliArgs), + argmapper.Typed(bproto), + argmapper.Named("basis", bproto), ) if err != nil || result == nil || result.(int64) != 0 { b.logger.Error("failed to execute command", "type", component.CommandType, "name", task.Component.Name, "error", err) @@ -496,13 +492,14 @@ func (b *Basis) callDynamicFunc( // Be sure that the status is closed after every operation so we don't leak // weird output outside the normal execution. - defer b.UI.Status().Close() + defer b.ui.Status().Close() args = append(args, argmapper.ConverterFunc(b.mappers...), argmapper.Typed( b.jobInfo, b.dir, + b.ui, ), ) @@ -599,7 +596,7 @@ func WithMappers(m ...*argmapper.Func) BasisOption { // WithUI sets the UI to use. If this isn't set, a BasicUI is used. func WithUI(ui terminal.UI) BasisOption { return func(b *Basis) (err error) { - b.UI = ui + b.ui = ui return } } diff --git a/internal/core/hook.go b/internal/core/hook.go index f14353442..a94c85785 100644 --- a/internal/core/hook.go +++ b/internal/core/hook.go @@ -11,11 +11,15 @@ import ( // execHook executes the given hook. This will return any errors. This ignores // on_failure configurations so this must be processed external. -func execHook(ctx context.Context, s scope, log hclog.Logger, h *config.Hook) error { +func execHook(ctx context.Context, s scope, log hclog.Logger, h *config.Hook) (err error) { log.Debug("executing hook", "command", h.Command) // Get our writers - stdout, stderr, err := s.Ui().OutputWriters() + ui, err := s.UI() + if err != nil { + return + } + stdout, stderr, err := ui.OutputWriters() if err != nil { log.Warn("error getting UI stdout/stderr", "err", err) return err diff --git a/internal/core/operation.go b/internal/core/operation.go index 925469cfd..17d25d02a 100644 --- a/internal/core/operation.go +++ b/internal/core/operation.go @@ -19,7 +19,7 @@ import ( ) type scope interface { - Ui() terminal.UI + UI() (terminal.UI, error) Ref() interface{} JobInfo() *component.JobInfo Client() *serverclient.VagrantClient @@ -73,9 +73,6 @@ func doOperation( return nil, nil, err } - // Initialize our labels - msgUpdateLabels(s, op.Labels(s), msg, nil) - // Setup our job id if we have that field. if f := msgField(msg, "JobId"); f.IsValid() { f.Set(reflect.ValueOf(s.JobInfo().Id)) @@ -135,9 +132,6 @@ func doOperation( log.Debug("running local operation") result, doErr = op.Do(ctx, log, s, msg) if doErr == nil { - // Set our labels if we can - msgUpdateLabels(s, op.Labels(s), msg, result) - // No error, our state is success server.StatusSetSuccess(*statusPtr) @@ -198,29 +192,6 @@ func doOperation( return result, msg, nil } -func msgUpdateLabels( - s scope, - base map[string]string, - msg proto.Message, - result interface{}, -) { - // Get our labels field in our proto message. If we don't have one - // then we don't bother doing anything else since labels are moot. - val := msgField(msg, "Labels") - if !val.IsValid() { - return - } - - // Determine any labels we have in our result - var resultLabels map[string]string - if labels, ok := result.(interface{ Labels() map[string]string }); ok { - resultLabels = labels.Labels() - } - - // Merge them - val.Set(reflect.ValueOf(s.mergeLabels(base, resultLabels))) -} - // msgId gets the id of the message by looking for the "Id" field. This // will return empty string if the ID field can't be found for any reason. func msgId(msg proto.Message) string { @@ -246,4 +217,4 @@ func msgField(msg proto.Message, f string) reflect.Value { var _ scope = (*Basis)(nil) var _ scope = (*Project)(nil) -var _ scope = (*Machine)(nil) +var _ scope = (*Target)(nil) diff --git a/internal/core/project.go b/internal/core/project.go index 7fdba9353..6d6f64475 100644 --- a/internal/core/project.go +++ b/internal/core/project.go @@ -11,12 +11,13 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-multierror" - "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/hashicorp/vagrant-plugin-sdk/component" + "github.com/hashicorp/vagrant-plugin-sdk/core" "github.com/hashicorp/vagrant-plugin-sdk/datadir" + "github.com/hashicorp/vagrant-plugin-sdk/internal-shared/plugincore" "github.com/hashicorp/vagrant-plugin-sdk/proto/vagrant_plugin_sdk" "github.com/hashicorp/vagrant-plugin-sdk/terminal" @@ -42,8 +43,6 @@ type Project struct { dir *datadir.Project mappers []*argmapper.Func - grpcServer *grpc.Server - // jobInfo is the base job info for executed functions. jobInfo *component.JobInfo @@ -56,13 +55,60 @@ type Project struct { // UI is the terminal UI to use for messages related to the project // as a whole. These messages will show up unprefixed for example compared // to the app-specific UI. - UI terminal.UI + ui terminal.UI } -func (p *Project) Ui() terminal.UI { - return p.UI +// Start required core.Project interface functions +func (p *Project) UI() (terminal.UI, error) { + return p.ui, nil } +func (p *Project) CWD() (path string, err error) { + // TODO: implement + return +} + +func (p *Project) DataDir() (*datadir.Project, error) { + return p.dir, nil +} + +func (p *Project) VagrantfileName() (name string, err error) { + // TODO: implement + return +} + +func (p *Project) Home() (path string, err error) { + // TODO: implement + return +} + +func (p *Project) LocalData() (path string, err error) { + // TODO: implement + return +} + +func (p *Project) Tmp() (path string, err error) { + // TODO: implement + return +} + +func (p *Project) DefaultPrivateKey() (path string, err error) { + // TODO: implement + return +} + +func (p *Project) Host() (host core.Host, err error) { + // TODO: implement + return +} + +func (p *Project) MachineNames() (names []string, err error) { + // TODO: implement + return +} + +// End required core.Project interface functions + func (p *Project) Name() string { return p.project.Name } @@ -93,7 +139,7 @@ func (p *Project) LoadTarget(topts ...TargetOption) (t *Target, err error) { ctx: p.ctx, project: p, logger: p.logger.Named("target"), - UI: p.UI, + ui: p.ui, } // Apply any options provided @@ -108,20 +154,10 @@ func (p *Project) LoadTarget(topts ...TargetOption) (t *Target, err error) { } // If the machine is already loaded, return that - if target, ok := p.targets[t.Name()]; ok { + if target, ok := p.targets[t.target.Name]; ok { return target, nil } - // Init server to make this target available - t.grpcServer = grpc.NewServer() - vagrant_plugin_sdk.RegisterTargetServiceServer(t.grpcServer, t) - - // Close down the local server when complete - t.Closer(func() error { - t.grpcServer.GracefulStop() - return nil - }) - // Ensure any modifications to the target are persisted t.Closer(func() error { return t.Save() }) @@ -183,6 +219,18 @@ func (p *Project) Run(ctx context.Context, task *vagrant_server.Task) (err error return } + // Pass along to the call + project := plugincore.NewProjectPlugin(p, p.logger) + streamId, err := wrapInstance(project, cmd.plugin.Broker, p) + pproto := &vagrant_plugin_sdk.Args_Project{StreamId: streamId} + + p.Closer(func() error { + if c, ok := project.(closes); ok { + return c.Close() + } + return nil + }) + result, err := p.callDynamicFunc( ctx, p.logger, @@ -190,6 +238,8 @@ func (p *Project) Run(ctx context.Context, task *vagrant_server.Task) (err error cmd, cmd.Value.(component.Command).ExecuteFunc(strings.Split(task.CommandName, " ")), argmapper.Typed(task.CliArgs), + argmapper.Typed(pproto), + argmapper.Named("project", pproto), ) if err != nil || result == nil || result.(int64) != 0 { p.logger.Error("failed to execute command", "type", component.CommandType, "name", task.Component.Name, "result", result, "error", err) @@ -235,7 +285,7 @@ func (p *Project) Close() (err error) { // Saves the project to the db func (p *Project) Save() (err error) { p.logger.Trace("saving project to db", "project", p.ResourceId()) - _, err := p.Client().UpsertProject(p.ctx, &vagrant_server.UpsertProjectRequest{ + _, err = p.Client().UpsertProject(p.ctx, &vagrant_server.UpsertProjectRequest{ Project: p.project}) if err != nil { p.logger.Trace("failed to save project", "project", p.ResourceId()) @@ -245,11 +295,11 @@ func (p *Project) Save() (err error) { // Saves the project to the db as well as any targets that have been loaded func (p *Project) SaveFull() (err error) { - p.logger.Debug("performing full save", "project", p.ResourceId()) + p.logger.Debug("performing full save", "project", p.project.ResourceId) for _, t := range p.targets { - p.logger.Trace("saving target", "project", p.ResourceId(), "target", t.ResourceId()) + p.logger.Trace("saving target", "project", p.project.ResourceId, "target", t.target.ResourceId) if terr := t.Save(); terr != nil { - p.logger.Trace("error while saving target", "target", t.ResourceId(), "error", err) + p.logger.Trace("error while saving target", "target", t.target.ResourceId, "error", err) err = multierror.Append(err, terr) } } @@ -270,7 +320,7 @@ func (p *Project) callDynamicFunc( // Be sure that the status is closed after every operation so we don't leak // weird output outside the normal execution. - defer p.UI.Status().Close() + defer p.ui.Status().Close() args = append(args, argmapper.ConverterFunc(p.mappers...), @@ -420,3 +470,4 @@ func WithProjectRef(r *vagrant_plugin_sdk.Ref_Project) ProjectOption { } var _ *Project = (*Project)(nil) +var _ core.Project = (*Project)(nil) diff --git a/internal/core/target.go b/internal/core/target.go index 4fb24b55b..766c102d7 100644 --- a/internal/core/target.go +++ b/internal/core/target.go @@ -14,9 +14,12 @@ import ( "github.com/hashicorp/vagrant/internal/plugin" "github.com/hashicorp/vagrant/internal/serverclient" "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/anypb" "github.com/hashicorp/vagrant-plugin-sdk/component" + "github.com/hashicorp/vagrant-plugin-sdk/core" "github.com/hashicorp/vagrant-plugin-sdk/datadir" + "github.com/hashicorp/vagrant-plugin-sdk/internal-shared/plugincore" "github.com/hashicorp/vagrant-plugin-sdk/proto/vagrant_plugin_sdk" "github.com/hashicorp/vagrant-plugin-sdk/terminal" @@ -35,11 +38,11 @@ type Target struct { lock sync.Mutex jobInfo *component.JobInfo closers []func() error - UI terminal.UI + ui terminal.UI } -func (t *Target) Ui() terminal.UI { - return t.UI +func (t *Target) UI() (terminal.UI, error) { + return t.ui, nil } func (t *Target) Ref() interface{} { @@ -49,12 +52,32 @@ func (t *Target) Ref() interface{} { } } -func (t *Target) Name() string { - return t.target.Name +func (t *Target) Name() (string, error) { + return t.target.Name, nil } -func (t *Target) ResourceId() string { - return t.target.ResourceId +func (t *Target) ResourceId() (string, error) { + return t.target.ResourceId, nil +} + +func (t *Target) Project() (core.Project, error) { + return t.project, nil +} + +func (t *Target) Metadata() (map[string]string, error) { + return t.target.Metadata.Metadata, nil +} + +func (t *Target) DataDir() (*datadir.Target, error) { + return t.dir, nil +} + +func (t *Target) State() (core.State, error) { + return core.UNKNOWN, nil +} + +func (t *Target) Record() (*anypb.Any, error) { + return t.target.Record, nil } func (t *Target) JobInfo() *component.JobInfo { @@ -87,11 +110,11 @@ func (t *Target) Close() (err error) { } func (t *Target) Save() (err error) { - t.logger.Debug("saving target to db", "target", t.ResourceId()) - _, err := t.Client().UpsertTarget(t.ctx, &vagrant_server.UpsertTargetRequest{ + t.logger.Debug("saving target to db", "target", t.target.ResourceId) + _, err = t.Client().UpsertTarget(t.ctx, &vagrant_server.UpsertTargetRequest{ Target: t.target}) if err != nil { - t.logger.Trace("failed to save target", "target", t.ResourceId(), "error", err) + t.logger.Trace("failed to save target", "target", t.target.ResourceId, "error", err) } return } @@ -114,6 +137,18 @@ func (t *Target) Run(ctx context.Context, task *vagrant_server.Task) (err error) return } + // Pass along to the call + target := plugincore.NewTargetPlugin(t, t.logger) + streamId, err := wrapInstance(target, cmd.plugin.Broker, t) + tproto := &vagrant_plugin_sdk.Args_Project{StreamId: streamId} + + t.Closer(func() error { + if c, ok := target.(closes); ok { + return c.Close() + } + return nil + }) + result, err := t.callDynamicFunc( ctx, t.logger, @@ -121,6 +156,8 @@ func (t *Target) Run(ctx context.Context, task *vagrant_server.Task) (err error) cmd, cmd.Value.(component.Command).ExecuteFunc(strings.Split(task.CommandName, " ")), argmapper.Typed(task.CliArgs), + argmapper.Typed(tproto), + argmapper.Named("target", target), ) if err != nil || result == nil || result.(int64) != 0 { @@ -142,7 +179,7 @@ func (t *Target) callDynamicFunc( // Be sure that the status is closed after every operation so we don't leak // weird output outside the normal execution. - defer t.UI.Status().Close() + defer t.ui.Status().Close() args = append(args, argmapper.Typed( @@ -180,7 +217,7 @@ func WithTargetName(name string) TargetOption { return } for _, target := range t.project.targets { - if target.Name() != name { + if target.target.Name != name { continue } var result *vagrant_server.GetTargetResponse @@ -212,7 +249,8 @@ func WithTargetRef(r *vagrant_plugin_sdk.Ref_Target) TargetOption { result, err := t.Client().FindTarget(t.ctx, &vagrant_server.FindTargetRequest{ Target: &vagrant_server.Target{ - Name: r.Name, + Name: r.Name, + Project: r.Project, }, }, ) @@ -242,7 +280,7 @@ func WithTargetRef(r *vagrant_plugin_sdk.Ref_Target) TargetOption { return errors.New("target project configuration is invalid") } t.target = target - t.project.targets[t.Name()] = t + t.project.targets[t.target.Name] = t return } }