Expand broker functionality to include server implementation
This commit is contained in:
parent
f670b7ca5b
commit
b6a0ed111a
@ -9,6 +9,14 @@ module VagrantPlugins
|
||||
class Broker
|
||||
include Singleton
|
||||
|
||||
# Broker specific errors
|
||||
class Error < StandardError
|
||||
class StreamTimeout < Error; end
|
||||
end
|
||||
|
||||
# Maximum number of seconds to wait for receiving stream connection information
|
||||
STREAM_WAIT_TIMEOUT = 5
|
||||
|
||||
# Protobuf connection information
|
||||
Info = ::Plugin::ConnInfo
|
||||
|
||||
@ -36,16 +44,32 @@ module VagrantPlugins
|
||||
# is passed to our internal broker to track streams
|
||||
# for internal use.
|
||||
class Streamer < ::Plugin::GRPCBroker::Service
|
||||
# @return [Broker]
|
||||
attr_reader :broker
|
||||
|
||||
# Create a new streamer service
|
||||
#
|
||||
# @param broker [Broker] broker to register requests
|
||||
# @return [self]
|
||||
def initialize(broker:)
|
||||
super
|
||||
@broker = broker
|
||||
end
|
||||
|
||||
# Handle stream requests which include connection
|
||||
# information for stream IDs
|
||||
def start_stream(reqs, x)
|
||||
reqs.map do |req|
|
||||
Broker.instance.register(req)
|
||||
broker.register(req)
|
||||
end
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
||||
# Create a new broker
|
||||
def initialize
|
||||
# Create a server side broker
|
||||
def initialize(bind:, ports:)
|
||||
@bind = bind
|
||||
@ports = ports
|
||||
@streams_m = Mutex.new
|
||||
@streams = {}
|
||||
end
|
||||
@ -73,8 +97,6 @@ module VagrantPlugins
|
||||
# @note If stream information has not be received
|
||||
# for the requested ID it will wait for the
|
||||
# info.
|
||||
# TODO(spox): Should we add a timeout here similar
|
||||
# to within go-plugin (hard coded 5s).
|
||||
def dial(id)
|
||||
catch(:found) do
|
||||
@streams_m.synchronize do
|
||||
@ -83,11 +105,57 @@ module VagrantPlugins
|
||||
if v.nil?
|
||||
v = @streams[id] = ConditionVariable.new
|
||||
end
|
||||
v.wait(@streams_m)
|
||||
v.wait(@streams_m, STREAM_WAIT_TIMEOUT)
|
||||
if v == @streams[id]
|
||||
raise Error::StreamTimeout,
|
||||
"Failed to receive connection information for stream `#{id}'"
|
||||
end
|
||||
@streams[id]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def client_for(stub)
|
||||
end
|
||||
|
||||
class Client
|
||||
|
||||
# Create a new broker
|
||||
def initialize(client:)
|
||||
@stream_id_m = Mutex.new
|
||||
@stream_id = 0
|
||||
@servers_m = Mutex.new
|
||||
@servers = {}
|
||||
end
|
||||
|
||||
# @return [Integer] next stream id to use
|
||||
def next_id
|
||||
@stream_id_m.synchronize do
|
||||
@stream_id += 1
|
||||
end
|
||||
end
|
||||
|
||||
# Accept a specific stream ID and immediately
|
||||
# serve a gRPC server on that stream ID.
|
||||
#
|
||||
# @param id [Integer] stream id
|
||||
# @param services [Array<Class>] list of services to serve
|
||||
def accept_and_serve(id, services)
|
||||
s = GRPC::RpcServer.new
|
||||
health_checker = Grpc::Health::Checker.new
|
||||
port = s.add_http2_port("#{bind_addr}:0", :this_port_is_insecure)
|
||||
services.each do |srv_klass|
|
||||
s.handle(srv_klass.new)
|
||||
health_checker.add_status(srv_klass,
|
||||
Grpc::Health::V1::HealthCheckResponse::ServingStatus::SERVING)
|
||||
end
|
||||
s.handle(health_checker)
|
||||
s.run_till_terminated_or_interrupted([1, 'int', 'SIGQUIT', 'SIGINT'])
|
||||
end
|
||||
|
||||
## Client side ##
|
||||
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user