From 63f0b61ed63b5ebf4cfc80aaa415a6c77a446787 Mon Sep 17 00:00:00 2001 From: Chris Roberts Date: Thu, 8 Apr 2021 16:13:10 -0700 Subject: [PATCH] Add simple broker implementation for use within Ruby runtime --- plugins/commands/serve/broker.rb | 93 ++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 plugins/commands/serve/broker.rb diff --git a/plugins/commands/serve/broker.rb b/plugins/commands/serve/broker.rb new file mode 100644 index 000000000..17cf3a57e --- /dev/null +++ b/plugins/commands/serve/broker.rb @@ -0,0 +1,93 @@ +require "singleton" +require "thread" + +module VagrantPlugins + module CommandServe + # This is a Ruby implementation of the GRPC Broker found + # within the go-plugin library. This can be used to provide + # feature parity with golang based go-plugins + class Broker + include Singleton + + # Protobuf connection information + Info = ::Plugin::ConnInfo + + # Hold connection information received about + # available streams + class Connection + attr_reader :address, :network + + def initialize(address:, network:) + @address = address + @network = network + end + + def to_s + if network == "unix" + "unix:#{address}" + else + address + end + end + end + + # This is the streamer service required to process + # broker information about connection streams. This + # is passed to our internal broker to track streams + # for internal use. + class Streamer < ::Plugin::GRPCBroker::Service + def start_stream(reqs, x) + reqs.map do |req| + Broker.instance.register(req) + end + nil + end + end + + # Create a new broker + def initialize + @streams_m = Mutex.new + @streams = {} + end + + # Register a stream + # + # @param [Plugin::ConnInfo] info Connection information from the broker + # @return [nil] + def register(info) + @streams_m.synchronize do + notifier = @streams[info.service_id] + @streams[info.service_id] = Connection.new( + network: info.network, address: info.address) + if notifier + notifier.broadcast + end + end + nil + end + + # Get connection information for a given ID + # + # @param [Integer] id Identifier for the stream + # @return [Connection] + # @note If stream information has not be received + # for the requested ID it will wait for the + # info. + # TODO(spox): Should we add a timeout here similar + # to within go-plugin (hard coded 5s). + def dial(id) + catch(:found) do + @streams_m.synchronize do + v = @streams[id] + throw :found, v if v.is_a?(Connection) + if v.nil? + v = @streams[id] = ConditionVariable.new + end + v.wait(@streams_m) + @streams[id] + end + end + end + end + end +end