Update in-process builtin factory to return plugin registration

This commit is contained in:
Chris Roberts 2021-07-29 14:57:37 -07:00 committed by Paul Hinze
parent 63fe341ca5
commit ef3cde9cd3
No known key found for this signature in database
GPG Key ID: B69DEDF2D55501C0

View File

@ -3,8 +3,8 @@ package plugin
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
@ -35,17 +35,22 @@ func NewBuiltins(ctx context.Context, log hclog.Logger) *Builtin {
}
func (b *Builtin) ConnectInfo(name string) (*plugin.ReattachConfig, error) {
b.mu.Lock()
defer b.mu.Unlock()
r, ok := b.servers[name]
if !ok {
b.log.Error("failed to locate plugin", "name", name, "servers", b.servers)
return nil, fmt.Errorf("unknown builtin plugin %s", name)
// TODO(spox): need to update with this channel and cancelable context
for i := 0; i < 5; i++ {
b.mu.Lock()
r, ok := b.servers[name]
b.mu.Unlock()
if ok {
return r, nil
}
time.Sleep(1 * time.Second)
}
return r, nil
b.log.Error("failed to locate plugin", "name", name, "servers", b.servers)
return nil, fmt.Errorf("unknown builtin plugin %s", name)
}
func (b *Builtin) Add(name string, opts ...sdk.Option) (err error) {
func (b *Builtin) Add(name string, opts ...sdk.Option) (f PluginRegistration, err error) {
clCh := make(chan struct{})
reCh := make(chan *plugin.ReattachConfig)
cfg := &plugin.ServeTestConfig{
@ -73,7 +78,8 @@ func (b *Builtin) Add(name string, opts ...sdk.Option) (err error) {
b.log.Warn("builtin group has exited", "error", err)
b.cancel()
})
return
return b.Factory(name), nil
}
func (b *Builtin) Start() {
@ -84,8 +90,8 @@ func (b *Builtin) Close() {
b.cancel()
}
func (b *Builtin) Factory(name string, typ component.Type) interface{} {
return func(hclog.Logger) (interface{}, error) {
func (b *Builtin) Factory(name string) PluginRegistration {
return func(log hclog.Logger) (p *Plugin, err error) {
r, err := b.ConnectInfo(name)
if err != nil {
return nil, err
@ -97,30 +103,44 @@ func (b *Builtin) Factory(name string, typ component.Type) interface{} {
client := plugin.NewClient(config)
rpcClient, err := client.Client()
if err != nil {
b.log.Error("failed to create rpc client for builtin", "name", name, "error", err)
b.log.Error("failed to create rpc client for builtin",
"name", name,
"error", err)
rpcClient.Close()
return nil, err
}
var raw interface{}
if typ != component.MapperType {
raw, err = rpcClient.Dispense(strings.ToLower(typ.String()))
if err != nil {
rpcClient.Close()
return nil, err
}
}
mappers, err := pluginclient.Mappers(client)
raw, err := rpcClient.Dispense("plugininfo")
if err != nil {
rpcClient.Close()
return nil, err
log.Error("error requesting builtin plugin information interface",
"plugin", name,
"error", err)
return
}
info, ok := raw.(component.PluginInfo)
if !ok {
return nil, fmt.Errorf("failed to load builgin plugin information interface",
"plugin", name)
}
return &Instance{
Component: raw,
Mappers: mappers,
Close: func() {},
}, nil
p = &Plugin{
Builtin: false,
Client: rpcClient,
Location: fmt.Sprintf("builtin::%s", name),
Name: info.Name(),
Types: info.ComponentTypes(),
components: map[component.Type]*Instance{},
logger: log,
src: client,
}
// Close the rpcClient when plugin is closed
p.Closer(func() error {
return rpcClient.Close()
})
return
}
}