From ef3cde9cd3bf79241d4d53dd264dac5be9235821 Mon Sep 17 00:00:00 2001 From: Chris Roberts Date: Thu, 29 Jul 2021 14:57:37 -0700 Subject: [PATCH] Update in-process builtin factory to return plugin registration --- internal/plugin/builtin.go | 80 ++++++++++++++++++++++++-------------- 1 file changed, 50 insertions(+), 30 deletions(-) diff --git a/internal/plugin/builtin.go b/internal/plugin/builtin.go index 92630d554..c488828d0 100644 --- a/internal/plugin/builtin.go +++ b/internal/plugin/builtin.go @@ -3,8 +3,8 @@ package plugin import ( "context" "fmt" - "strings" "sync" + "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" @@ -35,17 +35,22 @@ func NewBuiltins(ctx context.Context, log hclog.Logger) *Builtin { } func (b *Builtin) ConnectInfo(name string) (*plugin.ReattachConfig, error) { - b.mu.Lock() - defer b.mu.Unlock() - r, ok := b.servers[name] - if !ok { - b.log.Error("failed to locate plugin", "name", name, "servers", b.servers) - return nil, fmt.Errorf("unknown builtin plugin %s", name) + // TODO(spox): need to update with this channel and cancelable context + for i := 0; i < 5; i++ { + b.mu.Lock() + r, ok := b.servers[name] + b.mu.Unlock() + if ok { + return r, nil + } + time.Sleep(1 * time.Second) } - return r, nil + + b.log.Error("failed to locate plugin", "name", name, "servers", b.servers) + return nil, fmt.Errorf("unknown builtin plugin %s", name) } -func (b *Builtin) Add(name string, opts ...sdk.Option) (err error) { +func (b *Builtin) Add(name string, opts ...sdk.Option) (f PluginRegistration, err error) { clCh := make(chan struct{}) reCh := make(chan *plugin.ReattachConfig) cfg := &plugin.ServeTestConfig{ @@ -73,7 +78,8 @@ func (b *Builtin) Add(name string, opts ...sdk.Option) (err error) { b.log.Warn("builtin group has exited", "error", err) b.cancel() }) - return + + return b.Factory(name), nil } func (b *Builtin) Start() { @@ -84,8 +90,8 @@ func (b *Builtin) Close() { b.cancel() } -func (b *Builtin) Factory(name string, typ component.Type) interface{} { - return func(hclog.Logger) (interface{}, error) { +func (b *Builtin) Factory(name string) PluginRegistration { + return func(log hclog.Logger) (p *Plugin, err error) { r, err := b.ConnectInfo(name) if err != nil { return nil, err @@ -97,30 +103,44 @@ func (b *Builtin) Factory(name string, typ component.Type) interface{} { client := plugin.NewClient(config) rpcClient, err := client.Client() if err != nil { - b.log.Error("failed to create rpc client for builtin", "name", name, "error", err) + b.log.Error("failed to create rpc client for builtin", + "name", name, + "error", err) + rpcClient.Close() return nil, err } - var raw interface{} - if typ != component.MapperType { - raw, err = rpcClient.Dispense(strings.ToLower(typ.String())) - if err != nil { - rpcClient.Close() - return nil, err - } - } - - mappers, err := pluginclient.Mappers(client) + raw, err := rpcClient.Dispense("plugininfo") if err != nil { - rpcClient.Close() - return nil, err + log.Error("error requesting builtin plugin information interface", + "plugin", name, + "error", err) + + return + } + info, ok := raw.(component.PluginInfo) + if !ok { + return nil, fmt.Errorf("failed to load builgin plugin information interface", + "plugin", name) } - return &Instance{ - Component: raw, - Mappers: mappers, - Close: func() {}, - }, nil + p = &Plugin{ + Builtin: false, + Client: rpcClient, + Location: fmt.Sprintf("builtin::%s", name), + Name: info.Name(), + Types: info.ComponentTypes(), + components: map[component.Type]*Instance{}, + logger: log, + src: client, + } + + // Close the rpcClient when plugin is closed + p.Closer(func() error { + return rpcClient.Close() + }) + + return } }