| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726 | # Copyright 2015 gRPC authors.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at##     http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.require 'spec_helper'def load_test_certs  test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')  files = ['ca.pem', 'server1.key', 'server1.pem']  files.map { |f| File.open(File.join(test_root, f)).read }enddef check_md(wanted_md, received_md)  wanted_md.zip(received_md).each do |w, r|    w.each do |key, value|      expect(r[key]).to eq(value)    end  endend# A test service with no methods.class EmptyService  include GRPC::GenericServiceend# A test service without an implementation.class NoRpcImplementation  include GRPC::GenericService  rpc :an_rpc, EchoMsg, EchoMsgend# A test service with an implementation that fails with BadStatusclass FailingService  include GRPC::GenericService  rpc :an_rpc, EchoMsg, EchoMsg  attr_reader :details, :code, :md  def initialize(_default_var = 'ignored')    @details = 'app error'    @code = 101    @md = { 'failed_method' => 'an_rpc' }  end  def an_rpc(_req, _call)    fail GRPC::BadStatus.new(@code, @details, @md)  endendFailingStub = FailingService.rpc_stub_class# A slow test service.class SlowService  include GRPC::GenericService  rpc :an_rpc, EchoMsg, EchoMsg  attr_reader :received_md, :delay  def initialize(_default_var = 'ignored')    @delay = 0.25    @received_md = []  end  def an_rpc(req, call)    GRPC.logger.info("starting a slow #{@delay} rpc")    sleep @delay    @received_md << call.metadata unless call.metadata.nil?    req  # send back the req as the response  endendSlowStub = SlowService.rpc_stub_class# A test service that allows a synchronized RPC cancellationclass SynchronizedCancellationService  include GRPC::GenericService  rpc :an_rpc, EchoMsg, EchoMsg  attr_reader :received_md, :delay  # notify_request_received and wait_until_rpc_cancelled are  # callbacks to synchronously allow the client to proceed with  # cancellation (after the unary request has been received),  # and to synchronously wait until the client has cancelled the  # current RPC.  def initialize(notify_request_received, wait_until_rpc_cancelled)    @notify_request_received = notify_request_received    @wait_until_rpc_cancelled = wait_until_rpc_cancelled  end  def an_rpc(req, _call)    GRPC.logger.info('starting a synchronusly cancelled rpc')    @notify_request_received.call(req)    @wait_until_rpc_cancelled.call    req  # send back the req as the response  endendSynchronizedCancellationStub = SynchronizedCancellationService.rpc_stub_class# a test service that hangs onto call objects# and uses them after the server-side call has been# finishedclass CheckCallAfterFinishedService  include GRPC::GenericService  rpc :an_rpc, EchoMsg, EchoMsg  rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg  rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)  rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)  attr_reader :server_side_call  def an_rpc(req, call)    fail 'shouldnt reuse service' unless @server_side_call.nil?    @server_side_call = call    req  end  def a_client_streaming_rpc(call)    fail 'shouldnt reuse service' unless @server_side_call.nil?    @server_side_call = call    # iterate through requests so call can complete    call.each_remote_read.each { |r| GRPC.logger.info(r) }    EchoMsg.new  end  def a_server_streaming_rpc(_, call)    fail 'shouldnt reuse service' unless @server_side_call.nil?    @server_side_call = call    [EchoMsg.new, EchoMsg.new]  end  def a_bidi_rpc(requests, call)    fail 'shouldnt reuse service' unless @server_side_call.nil?    @server_side_call = call    requests.each { |r| GRPC.logger.info(r) }    [EchoMsg.new, EchoMsg.new]  endendCheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class# A service with a bidi streaming method.class BidiService  include GRPC::GenericService  rpc :server_sends_bad_input, stream(EchoMsg), stream(EchoMsg)  def server_sends_bad_input(_, _)    'bad response. (not an enumerable, client sees an error)'  endendBidiStub = BidiService.rpc_stub_classdescribe GRPC::RpcServer do  RpcServer = GRPC::RpcServer  StatusCodes = GRPC::Core::StatusCodes  before(:each) do    @method = 'an_rpc_method'    @pass = 0    @fail = 1    @noop = proc { |x| x }  end  describe '#new' do    it 'can be created with just some args' do      opts = { server_args: { a_channel_arg: 'an_arg' } }      blk = proc do        new_rpc_server_for_testing(**opts)      end      expect(&blk).not_to raise_error    end    it 'cannot be created with invalid ServerCredentials' do      blk = proc do        opts = {          server_args: { a_channel_arg: 'an_arg' },          creds: Object.new        }        new_rpc_server_for_testing(**opts)      end      expect(&blk).to raise_error    end  end  describe '#stopped?' do    before(:each) do      opts = { server_args: { a_channel_arg: 'an_arg' }, poll_period: 1.5 }      @srv = new_rpc_server_for_testing(**opts)      @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)    end    it 'starts out false' do      expect(@srv.stopped?).to be(false)    end    it 'stays false after the server starts running', server: true do      @srv.handle(EchoService)      t = Thread.new { @srv.run }      @srv.wait_till_running      expect(@srv.stopped?).to be(false)      @srv.stop      t.join    end    it 'is true after a running server is stopped', server: true do      @srv.handle(EchoService)      t = Thread.new { @srv.run }      @srv.wait_till_running      @srv.stop      t.join      expect(@srv.stopped?).to be(true)    end  end  describe '#running?' do    it 'starts out false' do      opts = {        server_args: { a_channel_arg: 'an_arg' }      }      r = new_rpc_server_for_testing(**opts)      expect(r.running?).to be(false)    end    it 'is false if run is called with no services registered', server: true do      opts = {        server_args: { a_channel_arg: 'an_arg' },        poll_period: 2      }      r = new_rpc_server_for_testing(**opts)      r.add_http2_port('0.0.0.0:0', :this_port_is_insecure)      expect { r.run }.to raise_error(RuntimeError)    end    it 'is true after run is called with a registered service' do      opts = {        server_args: { a_channel_arg: 'an_arg' },        poll_period: 2.5      }      r = new_rpc_server_for_testing(**opts)      r.add_http2_port('0.0.0.0:0', :this_port_is_insecure)      r.handle(EchoService)      t = Thread.new { r.run }      r.wait_till_running      expect(r.running?).to be(true)      r.stop      t.join    end  end  describe '#handle' do    before(:each) do      @opts = { server_args: { a_channel_arg: 'an_arg' }, poll_period: 1 }      @srv = new_rpc_server_for_testing(**@opts)      @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)    end    it 'raises if #run has already been called' do      @srv.handle(EchoService)      t = Thread.new { @srv.run }      @srv.wait_till_running      expect { @srv.handle(EchoService) }.to raise_error      @srv.stop      t.join    end    it 'raises if the server has been run and stopped' do      @srv.handle(EchoService)      t = Thread.new { @srv.run }      @srv.wait_till_running      @srv.stop      t.join      expect { @srv.handle(EchoService) }.to raise_error    end    it 'raises if the service does not include GenericService ' do      expect { @srv.handle(Object) }.to raise_error    end    it 'raises if the service does not declare any rpc methods' do      expect { @srv.handle(EmptyService) }.to raise_error    end    it 'raises if a handler method is already registered' do      @srv.handle(EchoService)      expect { r.handle(EchoService) }.to raise_error    end  end  describe '#run' do    let(:client_opts) { { channel_override: @ch } }    let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc }    let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) }    context 'with no connect_metadata' do      before(:each) do        server_opts = {          poll_period: 1        }        @srv = new_rpc_server_for_testing(**server_opts)        server_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)        @host = "localhost:#{server_port}"        @ch = GRPC::Core::Channel.new(@host, nil, :this_channel_is_insecure)      end      it 'should return NOT_FOUND status on unknown methods', server: true do        @srv.handle(EchoService)        t = Thread.new { @srv.run }        @srv.wait_till_running        req = EchoMsg.new        blk = proc do          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure,                                      **client_opts)          stub.request_response('/unknown', req, marshal, unmarshal)        end        expect(&blk).to raise_error GRPC::BadStatus        @srv.stop        t.join      end      it 'should return UNIMPLEMENTED on unimplemented methods', server: true do        @srv.handle(NoRpcImplementation)        t = Thread.new { @srv.run }        @srv.wait_till_running        req = EchoMsg.new        blk = proc do          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure,                                      **client_opts)          stub.request_response('/an_rpc', req, marshal, unmarshal)        end        expect(&blk).to raise_error do |error|          expect(error).to be_a(GRPC::BadStatus)          expect(error.code).to be(GRPC::Core::StatusCodes::UNIMPLEMENTED)        end        @srv.stop        t.join      end      it 'should handle multiple sequential requests', server: true do        @srv.handle(EchoService)        t = Thread.new { @srv.run }        @srv.wait_till_running        req = EchoMsg.new        n = 5  # arbitrary        stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)        n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) }        @srv.stop        t.join      end      it 'should receive metadata sent as rpc keyword args', server: true do        service = EchoService.new        @srv.handle(service)        t = Thread.new { @srv.run }        @srv.wait_till_running        req = EchoMsg.new        stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)        expect(stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }))          .to be_a(EchoMsg)        wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]        check_md(wanted_md, service.received_md)        @srv.stop        t.join      end      it 'should receive metadata if a deadline is specified', server: true do        service = SlowService.new        @srv.handle(service)        t = Thread.new { @srv.run }        @srv.wait_till_running        req = EchoMsg.new        stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts)        timeout = service.delay + 1.0        deadline = GRPC::Core::TimeConsts.from_relative_time(timeout)        resp = stub.an_rpc(req,                           deadline: deadline,                           metadata: { k1: 'v1', k2: 'v2' })        expect(resp).to be_a(EchoMsg)        wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]        check_md(wanted_md, service.received_md)        @srv.stop        t.join      end      it 'should handle cancellation correctly', server: true do        request_received = false        request_received_mu = Mutex.new        request_received_cv = ConditionVariable.new        notify_request_received = proc do |req|          request_received_mu.synchronize do            fail 'req is nil' if req.nil?            expect(req.is_a?(EchoMsg)).to be true            fail 'test bug - already set' if request_received            request_received = true            request_received_cv.signal          end        end        rpc_cancelled = false        rpc_cancelled_mu = Mutex.new        rpc_cancelled_cv = ConditionVariable.new        wait_until_rpc_cancelled = proc do          rpc_cancelled_mu.synchronize do            loop do              break if rpc_cancelled              rpc_cancelled_cv.wait(rpc_cancelled_mu)            end          end        end        service = SynchronizedCancellationService.new(notify_request_received,                                                      wait_until_rpc_cancelled)        @srv.handle(service)        srv_thd = Thread.new { @srv.run }        @srv.wait_till_running        req = EchoMsg.new        stub = SynchronizedCancellationStub.new(@host,                                                :this_channel_is_insecure,                                                **client_opts)        op = stub.an_rpc(req, return_op: true)        client_thd = Thread.new do          expect { op.execute }.to raise_error GRPC::Cancelled        end        request_received_mu.synchronize do          loop do            break if request_received            request_received_cv.wait(request_received_mu)          end        end        op.cancel        rpc_cancelled_mu.synchronize do          fail 'test bug - already set' if rpc_cancelled          rpc_cancelled = true          rpc_cancelled_cv.signal        end        client_thd.join        @srv.stop        srv_thd.join      end      it 'should handle multiple parallel requests', server: true do        @srv.handle(EchoService)        t = Thread.new { @srv.run }        @srv.wait_till_running        req, q = EchoMsg.new, Queue.new        n = 5  # arbitrary        threads = [t]        n.times do          threads << Thread.new do            stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)            q << stub.an_rpc(req)          end        end        n.times { expect(q.pop).to be_a(EchoMsg) }        @srv.stop        threads.each(&:join)      end      it 'should return RESOURCE_EXHAUSTED on too many jobs', server: true do        opts = {          server_args: { a_channel_arg: 'an_arg' },          pool_size: 2,          poll_period: 1,          max_waiting_requests: 1        }        alt_srv = new_rpc_server_for_testing(**opts)        alt_srv.handle(SlowService)        alt_port = alt_srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)        alt_host = "0.0.0.0:#{alt_port}"        t = Thread.new { alt_srv.run }        alt_srv.wait_till_running        req = EchoMsg.new        n = 20 # arbitrary, use as many to ensure the server pool is exceeded        threads = []        one_failed_as_unavailable = false        n.times do          threads << Thread.new do            stub = SlowStub.new(alt_host, :this_channel_is_insecure)            begin              stub.an_rpc(req)            rescue GRPC::ResourceExhausted              one_failed_as_unavailable = true            end          end        end        threads.each(&:join)        alt_srv.stop        t.join        expect(one_failed_as_unavailable).to be(true)      end      it 'should send a status UNKNOWN with a relevant message when the' \        'servers response stream is not an enumerable' do        @srv.handle(BidiService)        t = Thread.new { @srv.run }        @srv.wait_till_running        stub = BidiStub.new(@host, :this_channel_is_insecure, **client_opts)        responses = stub.server_sends_bad_input([])        exception = nil        begin          responses.each { |r| r }        rescue GRPC::Unknown => e          exception = e        end        # Erroneous responses sent from the server handler should cause an        # exception on the client with relevant info.        expected_details = 'NoMethodError: undefined method `each\' for '\          '"bad response. (not an enumerable, client sees an error)"'        expect(exception.inspect.include?(expected_details)).to be true        @srv.stop        t.join      end    end    context 'with connect metadata' do      let(:test_md_proc) do        proc do |mth, md|          res = md.clone          res['method'] = mth          res['connect_k1'] = 'connect_v1'          res        end      end      before(:each) do        server_opts = {          poll_period: 1,          connect_md_proc: test_md_proc        }        @srv = new_rpc_server_for_testing(**server_opts)        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)        @alt_host = "0.0.0.0:#{alt_port}"      end      it 'should send connect metadata to the client', server: true do        service = EchoService.new        @srv.handle(service)        t = Thread.new { @srv.run }        @srv.wait_till_running        req = EchoMsg.new        stub = EchoStub.new(@alt_host, :this_channel_is_insecure)        op = stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }, return_op: true)        expect(op.metadata).to be nil        expect(op.execute).to be_a(EchoMsg)        wanted_md = {          'k1' => 'v1',          'k2' => 'v2',          'method' => '/EchoService/an_rpc',          'connect_k1' => 'connect_v1'        }        wanted_md.each do |key, value|          GRPC.logger.info("key: #{key}")          expect(op.metadata[key]).to eq(value)        end        @srv.stop        t.join      end    end    context 'with trailing metadata' do      before(:each) do        server_opts = {          poll_period: 1        }        @srv = new_rpc_server_for_testing(**server_opts)        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)        @alt_host = "0.0.0.0:#{alt_port}"      end      it 'should be added to BadStatus when requests fail', server: true do        service = FailingService.new        @srv.handle(service)        t = Thread.new { @srv.run }        @srv.wait_till_running        req = EchoMsg.new        stub = FailingStub.new(@alt_host, :this_channel_is_insecure)        blk = proc { stub.an_rpc(req) }        # confirm it raise the expected error        expect(&blk).to raise_error GRPC::BadStatus        # call again and confirm exception contained the trailing metadata.        begin          blk.call        rescue GRPC::BadStatus => e          expect(e.code).to eq(service.code)          expect(e.details).to eq(service.details)          expect(e.metadata).to eq(service.md)        end        @srv.stop        t.join      end      it 'should be received by the client', server: true do        wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' }        service = EchoService.new(k1: 'out_v1', k2: 'out_v2')        @srv.handle(service)        t = Thread.new { @srv.run }        @srv.wait_till_running        req = EchoMsg.new        stub = EchoStub.new(@alt_host, :this_channel_is_insecure)        op = stub.an_rpc(req, return_op: true, metadata: { k1: 'v1', k2: 'v2' })        expect(op.metadata).to be nil        expect(op.execute).to be_a(EchoMsg)        expect(op.trailing_metadata).to eq(wanted_trailers)        @srv.stop        t.join      end    end    context 'when call objects are used after calls have completed' do      before(:each) do        server_opts = {          poll_period: 1        }        @srv = new_rpc_server_for_testing(**server_opts)        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)        @alt_host = "0.0.0.0:#{alt_port}"        @service = CheckCallAfterFinishedService.new        @srv.handle(@service)        @srv_thd  = Thread.new { @srv.run }        @srv.wait_till_running      end      # check that the server-side call is still in a usable state even      # after it has finished      def check_single_req_view_of_finished_call(call)        common_check_of_finished_server_call(call)        expect(call.peer).to be_a(String)        expect(call.peer_cert).to be(nil)      end      def check_multi_req_view_of_finished_call(call)        common_check_of_finished_server_call(call)        expect do          call.each_remote_read.each { |r| p r }        end.to raise_error(GRPC::Core::CallError)      end      def common_check_of_finished_server_call(call)        expect do          call.merge_metadata_to_send({})        end.to raise_error(RuntimeError)        expect do          call.send_initial_metadata        end.to_not raise_error        expect(call.cancelled?).to be(false)        expect(call.metadata).to be_a(Hash)        expect(call.metadata['user-agent']).to be_a(String)        expect(call.metadata_sent).to be(true)        expect(call.output_metadata).to eq({})        expect(call.metadata_to_send).to eq({})        expect(call.deadline.is_a?(Time)).to be(true)      end      it 'should not crash when call used after an unary call is finished' do        req = EchoMsg.new        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,                                                     :this_channel_is_insecure)        resp = stub.an_rpc(req)        expect(resp).to be_a(EchoMsg)        @srv.stop        @srv_thd.join        check_single_req_view_of_finished_call(@service.server_side_call)      end      it 'should not crash when call used after client streaming finished' do        requests = [EchoMsg.new, EchoMsg.new]        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,                                                     :this_channel_is_insecure)        resp = stub.a_client_streaming_rpc(requests)        expect(resp).to be_a(EchoMsg)        @srv.stop        @srv_thd.join        check_multi_req_view_of_finished_call(@service.server_side_call)      end      it 'should not crash when call used after server streaming finished' do        req = EchoMsg.new        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,                                                     :this_channel_is_insecure)        responses = stub.a_server_streaming_rpc(req)        responses.each do |r|          expect(r).to be_a(EchoMsg)        end        @srv.stop        @srv_thd.join        check_single_req_view_of_finished_call(@service.server_side_call)      end      it 'should not crash when call used after a bidi call is finished' do        requests = [EchoMsg.new, EchoMsg.new]        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,                                                     :this_channel_is_insecure)        responses = stub.a_bidi_rpc(requests)        responses.each do |r|          expect(r).to be_a(EchoMsg)        end        @srv.stop        @srv_thd.join        check_multi_req_view_of_finished_call(@service.server_side_call)      end    end  endend
 |