From b6a0ed111accb2a85c2c10e6ad528a0b898de078 Mon Sep 17 00:00:00 2001 From: Chris Roberts Date: Wed, 21 Apr 2021 16:02:00 -0700 Subject: [PATCH] Expand broker functionality to include server implementation --- plugins/commands/serve/broker.rb | 80 +++++++++++++++++++++++++++++--- 1 file changed, 74 insertions(+), 6 deletions(-) diff --git a/plugins/commands/serve/broker.rb b/plugins/commands/serve/broker.rb index 17cf3a57e..32e4845f8 100644 --- a/plugins/commands/serve/broker.rb +++ b/plugins/commands/serve/broker.rb @@ -9,6 +9,14 @@ module VagrantPlugins class Broker include Singleton + # Broker specific errors + class Error < StandardError + class StreamTimeout < Error; end + end + + # Maximum number of seconds to wait for receiving stream connection information + STREAM_WAIT_TIMEOUT = 5 + # Protobuf connection information Info = ::Plugin::ConnInfo @@ -36,16 +44,32 @@ module VagrantPlugins # is passed to our internal broker to track streams # for internal use. class Streamer < ::Plugin::GRPCBroker::Service + # @return [Broker] + attr_reader :broker + + # Create a new streamer service + # + # @param broker [Broker] broker to register requests + # @return [self] + def initialize(broker:) + super + @broker = broker + end + + # Handle stream requests which include connection + # information for stream IDs def start_stream(reqs, x) reqs.map do |req| - Broker.instance.register(req) + broker.register(req) end nil end end - # Create a new broker - def initialize + # Create a server side broker + def initialize(bind:, ports:) + @bind = bind + @ports = ports @streams_m = Mutex.new @streams = {} end @@ -73,8 +97,6 @@ module VagrantPlugins # @note If stream information has not be received # for the requested ID it will wait for the # info. - # TODO(spox): Should we add a timeout here similar - # to within go-plugin (hard coded 5s). def dial(id) catch(:found) do @streams_m.synchronize do @@ -83,11 +105,57 @@ module VagrantPlugins if v.nil? v = @streams[id] = ConditionVariable.new end - v.wait(@streams_m) + v.wait(@streams_m, STREAM_WAIT_TIMEOUT) + if v == @streams[id] + raise Error::StreamTimeout, + "Failed to receive connection information for stream `#{id}'" + end @streams[id] end end end + + def client_for(stub) + end + + class Client + + # Create a new broker + def initialize(client:) + @stream_id_m = Mutex.new + @stream_id = 0 + @servers_m = Mutex.new + @servers = {} + end + + # @return [Integer] next stream id to use + def next_id + @stream_id_m.synchronize do + @stream_id += 1 + end + end + + # Accept a specific stream ID and immediately + # serve a gRPC server on that stream ID. + # + # @param id [Integer] stream id + # @param services [Array] list of services to serve + def accept_and_serve(id, services) + s = GRPC::RpcServer.new + health_checker = Grpc::Health::Checker.new + port = s.add_http2_port("#{bind_addr}:0", :this_port_is_insecure) + services.each do |srv_klass| + s.handle(srv_klass.new) + health_checker.add_status(srv_klass, + Grpc::Health::V1::HealthCheckResponse::ServingStatus::SERVING) + end + s.handle(health_checker) + s.run_till_terminated_or_interrupted([1, 'int', 'SIGQUIT', 'SIGINT']) + end + + ## Client side ## + + end end end end