2022-04-25 12:26:17 -05:00

162 lines
4.4 KiB
Ruby

require "singleton"
require "thread"
module VagrantPlugins
module CommandServe
# This is a Ruby implementation of the GRPC Broker found
# within the go-plugin library. This can be used to provide
# feature parity with golang based go-plugins
class Broker
# Broker specific errors
class Error < StandardError
class StreamTimeout < Error; end
end
# Maximum number of seconds to wait for receiving stream connection information
STREAM_WAIT_TIMEOUT = 5
# Protobuf connection information
Info = ::Plugin::ConnInfo
# Hold connection information received about
# available streams
class Connection
attr_reader :address, :network
def initialize(address:, network:)
@address = address
@network = network
end
def to_s
if network == "unix"
"unix:#{address}"
else
address
end
end
end
# This is the streamer service required to process
# broker information about connection streams. This
# is passed to our internal broker to track streams
# for internal use.
class Streamer < ::Plugin::GRPCBroker::Service
# @return [Broker]
attr_reader :broker
# Create a new streamer service
#
# @param broker [Broker] broker to register requests
# @return [self]
def initialize(broker:)
super()
@broker = broker
end
# Handle stream requests which include connection
# information for stream IDs
def start_stream(reqs, x)
reqs.map do |req|
broker.register(req)
end
nil
end
end
# Create a server side broker
def initialize(bind:, ports:)
@bind = bind
@ports = ports
@streams_m = Mutex.new
@streams = {}
end
# Register a stream
#
# @param [Plugin::ConnInfo] info Connection information from the broker
# @return [nil]
def register(info)
@streams_m.synchronize do
notifier = @streams[info.service_id]
@streams[info.service_id] = Connection.new(
network: info.network, address: info.address)
if notifier
notifier.broadcast
end
end
nil
end
# Get connection information for a given ID
#
# @param [Integer] id Identifier for the stream
# @return [Connection]
# @note If stream information has not be received
# for the requested ID it will wait for the
# info.
def dial(id)
catch(:found) do
@streams_m.synchronize do
v = @streams[id]
throw :found, v if v.is_a?(Connection)
if v.nil?
v = @streams[id] = ConditionVariable.new
end
v.wait(@streams_m, STREAM_WAIT_TIMEOUT)
if v == @streams[id]
raise Error::StreamTimeout,
"Failed to receive connection information for stream `#{id}'"
end
@streams[id]
end
end
end
def inspect
to_s
end
def client_for(stub)
end
class Client
# Create a new broker
def initialize(client:)
@stream_id_m = Mutex.new
@stream_id = 0
@servers_m = Mutex.new
@servers = {}
end
# @return [Integer] next stream id to use
def next_id
@stream_id_m.synchronize do
@stream_id += 1
end
end
# Accept a specific stream ID and immediately
# serve a gRPC server on that stream ID.
#
# @param id [Integer] stream id
# @param services [Array<Class>] list of services to serve
def accept_and_serve(id, services)
s = GRPC::RpcServer.new
health_checker = Grpc::Health::Checker.new
port = s.add_http2_port("#{bind_addr}:0", :this_port_is_insecure)
services.each do |srv_klass|
s.handle(srv_klass.new)
health_checker.add_status(srv_klass,
Grpc::Health::V1::HealthCheckResponse::ServingStatus::SERVING)
end
s.handle(health_checker)
s.run_till_terminated_or_interrupted([1, 'int', 'SIGQUIT', 'SIGINT'])
end
end
end
end
end