Target updates, labels removal, and broker addition for creating core plugins

This commit is contained in:
Chris Roberts 2021-05-17 16:40:11 -07:00 committed by Paul Hinze
parent dcb6f4a9be
commit c1da0ab1b4
No known key found for this signature in database
GPG Key ID: B69DEDF2D55501C0
5 changed files with 174 additions and 113 deletions

View File

@ -12,12 +12,12 @@ import (
"github.com/hashicorp/go-argmapper"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/vagrant-plugin-sdk/component"
"github.com/hashicorp/vagrant-plugin-sdk/datadir"
"github.com/hashicorp/vagrant-plugin-sdk/internal-shared/plugincore"
"github.com/hashicorp/vagrant-plugin-sdk/internal-shared/protomappers"
"github.com/hashicorp/vagrant-plugin-sdk/proto/vagrant_plugin_sdk"
"github.com/hashicorp/vagrant-plugin-sdk/terminal"
@ -48,20 +48,9 @@ type Basis struct {
lock sync.Mutex
client *serverclient.VagrantClient
// NOTE: we can't have a broker to easily just shove at the
// mappers to help us. instead, lets update our scope defs
// in the proto Args so they can be a stream id (when being
// wrapped/passed by a plugin) or a connection end point. This
// will let us serve it from here without a new process being
// spun out, and we can wrap as needed as it gets shuttled around.
// Will just need a direct connection end point like we had for the
// ruby side before the broker there, and the mapper can just use
// what ever is available when doing setup.
grpcServer *grpc.Server
jobInfo *component.JobInfo
closers []func() error
UI terminal.UI
ui terminal.UI
}
// NewBasis creates a new Basis with the given options.
@ -71,6 +60,7 @@ func NewBasis(ctx context.Context, opts ...BasisOption) (b *Basis, err error) {
logger: hclog.L(),
jobInfo: &component.JobInfo{},
factories: plugin.BaseFactories,
projects: map[string]*Project{},
}
for _, opt := range opts {
@ -99,8 +89,8 @@ func NewBasis(ctx context.Context, opts ...BasisOption) (b *Basis, err error) {
}
// If no UI was provided, initialize a console UI
if b.UI == nil {
b.UI = terminal.ConsoleUI(ctx)
if b.ui == nil {
b.ui = terminal.ConsoleUI(ctx)
}
// If the mappers aren't already set, load known mappers
@ -119,29 +109,25 @@ func NewBasis(ctx context.Context, opts ...BasisOption) (b *Basis, err error) {
// configuration loading
if b.config == nil {
if b.config, err = config.Load("", ""); err != nil {
return
b.logger.Warn("failed to load config, using stub", "error", err)
b.config = &config.Config{}
err = nil
}
}
// Finally, init a server to make this basis available
b.grpcServer = grpc.NewServer([]grpc.ServerOption{}...)
vagrant_plugin_sdk.RegisterTargetServiceServer(b.grpcServer, b)
// Close down the local server when finished with the basis
b.Closer(func() error {
b.grpcServer.GracefulStop()
return nil
})
// Ensure any modifications to the basis are persisted
b.Closer(func() error { b.Save() })
b.Closer(func() error { return b.Save() })
b.logger.Info("basis initialized")
return
}
func (b *Basis) Ui() terminal.UI {
return b.UI
func (b *Basis) UI() (terminal.UI, error) {
return b.ui, nil
}
func (b *Basis) DataDir() (*datadir.Basis, error) {
return b.dir, nil
}
func (b *Basis) Ref() interface{} {
@ -242,7 +228,7 @@ func (b *Basis) LoadProject(popts ...ProjectOption) (p *Project, err error) {
mappers: b.mappers,
factories: b.factories,
targets: map[string]*Target{},
UI: b.UI,
ui: b.ui,
}
// Apply any options provided
@ -268,16 +254,6 @@ func (b *Basis) LoadProject(popts ...ProjectOption) (p *Project, err error) {
}
}
// Init server to make this project available
p.grpcServer = grpc.NewServer()
vagrant_plugin_sdk.RegisterProjectServiceServer(p.grpcServer, p)
// Close down the local server when complete
p.Closer(func() error {
p.grpcServer.GracefulStop()
return nil
})
// Ensure any modifications to the project are persisted
p.Closer(func() error { return p.Save() })
@ -318,7 +294,7 @@ func (b *Basis) Close() (err error) {
// Saves the basis to the db
func (b *Basis) Save() (err error) {
b.logger.Debug("saving basis to db", "basis", b.ResourceId())
_, err := b.Client().UpsertBasis(b.ctx, &vagrant_server.UpsertBasisRequest{
_, err = b.Client().UpsertBasis(b.ctx, &vagrant_server.UpsertBasisRequest{
Basis: b.basis})
if err != nil {
b.logger.Trace("failed to save basis", "basis", b.ResourceId(), "error", err)
@ -370,15 +346,33 @@ func (b *Basis) Components(ctx context.Context) ([]*Component, error) {
func (b *Basis) Run(ctx context.Context, task *vagrant_server.Task) (err error) {
b.logger.Debug("running new task", "basis", b, "task", task)
// Build the component to run
cmd, err := b.component(ctx, component.CommandType, task.Component.Name)
if err != nil {
return err
}
// Specialize it if required
if _, err = b.specializeComponent(cmd); err != nil {
return
}
// Pass along to the call
basis := plugincore.NewBasisPlugin(b, b.logger)
streamId, err := wrapInstance(basis, cmd.plugin.Broker, b)
bproto := &vagrant_plugin_sdk.Args_Basis{StreamId: streamId}
// NOTE(spox): Should this be closed after the dynamic func
// call is complete, or when we tear this down? The latter
// would allow plugins to keep a persistent connection if
// multiple things are being run
b.Closer(func() error {
if c, ok := basis.(closes); ok {
return c.Close()
}
return nil
})
result, err := b.callDynamicFunc(
ctx,
b.logger,
@ -386,6 +380,8 @@ func (b *Basis) Run(ctx context.Context, task *vagrant_server.Task) (err error)
cmd,
cmd.Value.(component.Command).ExecuteFunc(strings.Split(task.CommandName, " ")),
argmapper.Typed(task.CliArgs),
argmapper.Typed(bproto),
argmapper.Named("basis", bproto),
)
if err != nil || result == nil || result.(int64) != 0 {
b.logger.Error("failed to execute command", "type", component.CommandType, "name", task.Component.Name, "error", err)
@ -496,13 +492,14 @@ func (b *Basis) callDynamicFunc(
// Be sure that the status is closed after every operation so we don't leak
// weird output outside the normal execution.
defer b.UI.Status().Close()
defer b.ui.Status().Close()
args = append(args,
argmapper.ConverterFunc(b.mappers...),
argmapper.Typed(
b.jobInfo,
b.dir,
b.ui,
),
)
@ -599,7 +596,7 @@ func WithMappers(m ...*argmapper.Func) BasisOption {
// WithUI sets the UI to use. If this isn't set, a BasicUI is used.
func WithUI(ui terminal.UI) BasisOption {
return func(b *Basis) (err error) {
b.UI = ui
b.ui = ui
return
}
}

View File

@ -11,11 +11,15 @@ import (
// execHook executes the given hook. This will return any errors. This ignores
// on_failure configurations so this must be processed external.
func execHook(ctx context.Context, s scope, log hclog.Logger, h *config.Hook) error {
func execHook(ctx context.Context, s scope, log hclog.Logger, h *config.Hook) (err error) {
log.Debug("executing hook", "command", h.Command)
// Get our writers
stdout, stderr, err := s.Ui().OutputWriters()
ui, err := s.UI()
if err != nil {
return
}
stdout, stderr, err := ui.OutputWriters()
if err != nil {
log.Warn("error getting UI stdout/stderr", "err", err)
return err

View File

@ -19,7 +19,7 @@ import (
)
type scope interface {
Ui() terminal.UI
UI() (terminal.UI, error)
Ref() interface{}
JobInfo() *component.JobInfo
Client() *serverclient.VagrantClient
@ -73,9 +73,6 @@ func doOperation(
return nil, nil, err
}
// Initialize our labels
msgUpdateLabels(s, op.Labels(s), msg, nil)
// Setup our job id if we have that field.
if f := msgField(msg, "JobId"); f.IsValid() {
f.Set(reflect.ValueOf(s.JobInfo().Id))
@ -135,9 +132,6 @@ func doOperation(
log.Debug("running local operation")
result, doErr = op.Do(ctx, log, s, msg)
if doErr == nil {
// Set our labels if we can
msgUpdateLabels(s, op.Labels(s), msg, result)
// No error, our state is success
server.StatusSetSuccess(*statusPtr)
@ -198,29 +192,6 @@ func doOperation(
return result, msg, nil
}
func msgUpdateLabels(
s scope,
base map[string]string,
msg proto.Message,
result interface{},
) {
// Get our labels field in our proto message. If we don't have one
// then we don't bother doing anything else since labels are moot.
val := msgField(msg, "Labels")
if !val.IsValid() {
return
}
// Determine any labels we have in our result
var resultLabels map[string]string
if labels, ok := result.(interface{ Labels() map[string]string }); ok {
resultLabels = labels.Labels()
}
// Merge them
val.Set(reflect.ValueOf(s.mergeLabels(base, resultLabels)))
}
// msgId gets the id of the message by looking for the "Id" field. This
// will return empty string if the ID field can't be found for any reason.
func msgId(msg proto.Message) string {
@ -246,4 +217,4 @@ func msgField(msg proto.Message, f string) reflect.Value {
var _ scope = (*Basis)(nil)
var _ scope = (*Project)(nil)
var _ scope = (*Machine)(nil)
var _ scope = (*Target)(nil)

View File

@ -11,12 +11,13 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/vagrant-plugin-sdk/component"
"github.com/hashicorp/vagrant-plugin-sdk/core"
"github.com/hashicorp/vagrant-plugin-sdk/datadir"
"github.com/hashicorp/vagrant-plugin-sdk/internal-shared/plugincore"
"github.com/hashicorp/vagrant-plugin-sdk/proto/vagrant_plugin_sdk"
"github.com/hashicorp/vagrant-plugin-sdk/terminal"
@ -42,8 +43,6 @@ type Project struct {
dir *datadir.Project
mappers []*argmapper.Func
grpcServer *grpc.Server
// jobInfo is the base job info for executed functions.
jobInfo *component.JobInfo
@ -56,13 +55,60 @@ type Project struct {
// UI is the terminal UI to use for messages related to the project
// as a whole. These messages will show up unprefixed for example compared
// to the app-specific UI.
UI terminal.UI
ui terminal.UI
}
func (p *Project) Ui() terminal.UI {
return p.UI
// Start required core.Project interface functions
func (p *Project) UI() (terminal.UI, error) {
return p.ui, nil
}
func (p *Project) CWD() (path string, err error) {
// TODO: implement
return
}
func (p *Project) DataDir() (*datadir.Project, error) {
return p.dir, nil
}
func (p *Project) VagrantfileName() (name string, err error) {
// TODO: implement
return
}
func (p *Project) Home() (path string, err error) {
// TODO: implement
return
}
func (p *Project) LocalData() (path string, err error) {
// TODO: implement
return
}
func (p *Project) Tmp() (path string, err error) {
// TODO: implement
return
}
func (p *Project) DefaultPrivateKey() (path string, err error) {
// TODO: implement
return
}
func (p *Project) Host() (host core.Host, err error) {
// TODO: implement
return
}
func (p *Project) MachineNames() (names []string, err error) {
// TODO: implement
return
}
// End required core.Project interface functions
func (p *Project) Name() string {
return p.project.Name
}
@ -93,7 +139,7 @@ func (p *Project) LoadTarget(topts ...TargetOption) (t *Target, err error) {
ctx: p.ctx,
project: p,
logger: p.logger.Named("target"),
UI: p.UI,
ui: p.ui,
}
// Apply any options provided
@ -108,20 +154,10 @@ func (p *Project) LoadTarget(topts ...TargetOption) (t *Target, err error) {
}
// If the machine is already loaded, return that
if target, ok := p.targets[t.Name()]; ok {
if target, ok := p.targets[t.target.Name]; ok {
return target, nil
}
// Init server to make this target available
t.grpcServer = grpc.NewServer()
vagrant_plugin_sdk.RegisterTargetServiceServer(t.grpcServer, t)
// Close down the local server when complete
t.Closer(func() error {
t.grpcServer.GracefulStop()
return nil
})
// Ensure any modifications to the target are persisted
t.Closer(func() error { return t.Save() })
@ -183,6 +219,18 @@ func (p *Project) Run(ctx context.Context, task *vagrant_server.Task) (err error
return
}
// Pass along to the call
project := plugincore.NewProjectPlugin(p, p.logger)
streamId, err := wrapInstance(project, cmd.plugin.Broker, p)
pproto := &vagrant_plugin_sdk.Args_Project{StreamId: streamId}
p.Closer(func() error {
if c, ok := project.(closes); ok {
return c.Close()
}
return nil
})
result, err := p.callDynamicFunc(
ctx,
p.logger,
@ -190,6 +238,8 @@ func (p *Project) Run(ctx context.Context, task *vagrant_server.Task) (err error
cmd,
cmd.Value.(component.Command).ExecuteFunc(strings.Split(task.CommandName, " ")),
argmapper.Typed(task.CliArgs),
argmapper.Typed(pproto),
argmapper.Named("project", pproto),
)
if err != nil || result == nil || result.(int64) != 0 {
p.logger.Error("failed to execute command", "type", component.CommandType, "name", task.Component.Name, "result", result, "error", err)
@ -235,7 +285,7 @@ func (p *Project) Close() (err error) {
// Saves the project to the db
func (p *Project) Save() (err error) {
p.logger.Trace("saving project to db", "project", p.ResourceId())
_, err := p.Client().UpsertProject(p.ctx, &vagrant_server.UpsertProjectRequest{
_, err = p.Client().UpsertProject(p.ctx, &vagrant_server.UpsertProjectRequest{
Project: p.project})
if err != nil {
p.logger.Trace("failed to save project", "project", p.ResourceId())
@ -245,11 +295,11 @@ func (p *Project) Save() (err error) {
// Saves the project to the db as well as any targets that have been loaded
func (p *Project) SaveFull() (err error) {
p.logger.Debug("performing full save", "project", p.ResourceId())
p.logger.Debug("performing full save", "project", p.project.ResourceId)
for _, t := range p.targets {
p.logger.Trace("saving target", "project", p.ResourceId(), "target", t.ResourceId())
p.logger.Trace("saving target", "project", p.project.ResourceId, "target", t.target.ResourceId)
if terr := t.Save(); terr != nil {
p.logger.Trace("error while saving target", "target", t.ResourceId(), "error", err)
p.logger.Trace("error while saving target", "target", t.target.ResourceId, "error", err)
err = multierror.Append(err, terr)
}
}
@ -270,7 +320,7 @@ func (p *Project) callDynamicFunc(
// Be sure that the status is closed after every operation so we don't leak
// weird output outside the normal execution.
defer p.UI.Status().Close()
defer p.ui.Status().Close()
args = append(args,
argmapper.ConverterFunc(p.mappers...),
@ -420,3 +470,4 @@ func WithProjectRef(r *vagrant_plugin_sdk.Ref_Project) ProjectOption {
}
var _ *Project = (*Project)(nil)
var _ core.Project = (*Project)(nil)

View File

@ -14,9 +14,12 @@ import (
"github.com/hashicorp/vagrant/internal/plugin"
"github.com/hashicorp/vagrant/internal/serverclient"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/vagrant-plugin-sdk/component"
"github.com/hashicorp/vagrant-plugin-sdk/core"
"github.com/hashicorp/vagrant-plugin-sdk/datadir"
"github.com/hashicorp/vagrant-plugin-sdk/internal-shared/plugincore"
"github.com/hashicorp/vagrant-plugin-sdk/proto/vagrant_plugin_sdk"
"github.com/hashicorp/vagrant-plugin-sdk/terminal"
@ -35,11 +38,11 @@ type Target struct {
lock sync.Mutex
jobInfo *component.JobInfo
closers []func() error
UI terminal.UI
ui terminal.UI
}
func (t *Target) Ui() terminal.UI {
return t.UI
func (t *Target) UI() (terminal.UI, error) {
return t.ui, nil
}
func (t *Target) Ref() interface{} {
@ -49,12 +52,32 @@ func (t *Target) Ref() interface{} {
}
}
func (t *Target) Name() string {
return t.target.Name
func (t *Target) Name() (string, error) {
return t.target.Name, nil
}
func (t *Target) ResourceId() string {
return t.target.ResourceId
func (t *Target) ResourceId() (string, error) {
return t.target.ResourceId, nil
}
func (t *Target) Project() (core.Project, error) {
return t.project, nil
}
func (t *Target) Metadata() (map[string]string, error) {
return t.target.Metadata.Metadata, nil
}
func (t *Target) DataDir() (*datadir.Target, error) {
return t.dir, nil
}
func (t *Target) State() (core.State, error) {
return core.UNKNOWN, nil
}
func (t *Target) Record() (*anypb.Any, error) {
return t.target.Record, nil
}
func (t *Target) JobInfo() *component.JobInfo {
@ -87,11 +110,11 @@ func (t *Target) Close() (err error) {
}
func (t *Target) Save() (err error) {
t.logger.Debug("saving target to db", "target", t.ResourceId())
_, err := t.Client().UpsertTarget(t.ctx, &vagrant_server.UpsertTargetRequest{
t.logger.Debug("saving target to db", "target", t.target.ResourceId)
_, err = t.Client().UpsertTarget(t.ctx, &vagrant_server.UpsertTargetRequest{
Target: t.target})
if err != nil {
t.logger.Trace("failed to save target", "target", t.ResourceId(), "error", err)
t.logger.Trace("failed to save target", "target", t.target.ResourceId, "error", err)
}
return
}
@ -114,6 +137,18 @@ func (t *Target) Run(ctx context.Context, task *vagrant_server.Task) (err error)
return
}
// Pass along to the call
target := plugincore.NewTargetPlugin(t, t.logger)
streamId, err := wrapInstance(target, cmd.plugin.Broker, t)
tproto := &vagrant_plugin_sdk.Args_Project{StreamId: streamId}
t.Closer(func() error {
if c, ok := target.(closes); ok {
return c.Close()
}
return nil
})
result, err := t.callDynamicFunc(
ctx,
t.logger,
@ -121,6 +156,8 @@ func (t *Target) Run(ctx context.Context, task *vagrant_server.Task) (err error)
cmd,
cmd.Value.(component.Command).ExecuteFunc(strings.Split(task.CommandName, " ")),
argmapper.Typed(task.CliArgs),
argmapper.Typed(tproto),
argmapper.Named("target", target),
)
if err != nil || result == nil || result.(int64) != 0 {
@ -142,7 +179,7 @@ func (t *Target) callDynamicFunc(
// Be sure that the status is closed after every operation so we don't leak
// weird output outside the normal execution.
defer t.UI.Status().Close()
defer t.ui.Status().Close()
args = append(args,
argmapper.Typed(
@ -180,7 +217,7 @@ func WithTargetName(name string) TargetOption {
return
}
for _, target := range t.project.targets {
if target.Name() != name {
if target.target.Name != name {
continue
}
var result *vagrant_server.GetTargetResponse
@ -212,7 +249,8 @@ func WithTargetRef(r *vagrant_plugin_sdk.Ref_Target) TargetOption {
result, err := t.Client().FindTarget(t.ctx,
&vagrant_server.FindTargetRequest{
Target: &vagrant_server.Target{
Name: r.Name,
Name: r.Name,
Project: r.Project,
},
},
)
@ -242,7 +280,7 @@ func WithTargetRef(r *vagrant_plugin_sdk.Ref_Target) TargetOption {
return errors.New("target project configuration is invalid")
}
t.target = target
t.project.targets[t.Name()] = t
t.project.targets[t.target.Name] = t
return
}
}