| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 | 
							- # Copyright 2015-2016, Google Inc.
 
- # All rights reserved.
 
- #
 
- # Redistribution and use in source and binary forms, with or without
 
- # modification, are permitted provided that the following conditions are
 
- # met:
 
- #
 
- #     * Redistributions of source code must retain the above copyright
 
- # notice, this list of conditions and the following disclaimer.
 
- #     * Redistributions in binary form must reproduce the above
 
- # copyright notice, this list of conditions and the following disclaimer
 
- # in the documentation and/or other materials provided with the
 
- # distribution.
 
- #     * Neither the name of Google Inc. nor the names of its
 
- # contributors may be used to endorse or promote products derived from
 
- # this software without specific prior written permission.
 
- #
 
- # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
- # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
- # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 
- # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 
- # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 
- # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 
- # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
- # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
- # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
- # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 
- # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
- require 'grpc'
 
- def load_test_certs
 
-   test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
 
-   files = ['ca.pem', 'server1.key', 'server1.pem']
 
-   files.map { |f| File.open(File.join(test_root, f)).read }
 
- end
 
- def check_md(wanted_md, received_md)
 
-   wanted_md.zip(received_md).each do |w, r|
 
-     w.each do |key, value|
 
-       expect(r[key]).to eq(value)
 
-     end
 
-   end
 
- end
 
- # A test message
 
- class EchoMsg
 
-   def self.marshal(_o)
 
-     ''
 
-   end
 
-   def self.unmarshal(_o)
 
-     EchoMsg.new
 
-   end
 
- end
 
- # A test service with no methods.
 
- class EmptyService
 
-   include GRPC::GenericService
 
- end
 
- # A test service without an implementation.
 
- class NoRpcImplementation
 
-   include GRPC::GenericService
 
-   rpc :an_rpc, EchoMsg, EchoMsg
 
- end
 
- # A test service with an echo implementation.
 
- class EchoService
 
-   include GRPC::GenericService
 
-   rpc :an_rpc, EchoMsg, EchoMsg
 
-   attr_reader :received_md
 
-   def initialize(**kw)
 
-     @trailing_metadata = kw
 
-     @received_md = []
 
-   end
 
-   def an_rpc(req, call)
 
-     GRPC.logger.info('echo service received a request')
 
-     call.output_metadata.update(@trailing_metadata)
 
-     @received_md << call.metadata unless call.metadata.nil?
 
-     req
 
-   end
 
- end
 
- EchoStub = EchoService.rpc_stub_class
 
- # A test service with an implementation that fails with BadStatus
 
- class FailingService
 
-   include GRPC::GenericService
 
-   rpc :an_rpc, EchoMsg, EchoMsg
 
-   attr_reader :details, :code, :md
 
-   def initialize(_default_var = 'ignored')
 
-     @details = 'app error'
 
-     @code = 101
 
-     @md = { failed_method: 'an_rpc' }
 
-   end
 
-   def an_rpc(_req, _call)
 
-     fail GRPC::BadStatus.new(@code, @details, **@md)
 
-   end
 
- end
 
- FailingStub = FailingService.rpc_stub_class
 
- # A slow test service.
 
- class SlowService
 
-   include GRPC::GenericService
 
-   rpc :an_rpc, EchoMsg, EchoMsg
 
-   attr_reader :received_md, :delay
 
-   def initialize(_default_var = 'ignored')
 
-     @delay = 0.25
 
-     @received_md = []
 
-   end
 
-   def an_rpc(req, call)
 
-     GRPC.logger.info("starting a slow #{@delay} rpc")
 
-     sleep @delay
 
-     @received_md << call.metadata unless call.metadata.nil?
 
-     req  # send back the req as the response
 
-   end
 
- end
 
- SlowStub = SlowService.rpc_stub_class
 
- describe GRPC::RpcServer do
 
-   RpcServer = GRPC::RpcServer
 
-   StatusCodes = GRPC::Core::StatusCodes
 
-   before(:each) do
 
-     @method = 'an_rpc_method'
 
-     @pass = 0
 
-     @fail = 1
 
-     @noop = proc { |x| x }
 
-     @server_queue = GRPC::Core::CompletionQueue.new
 
-     server_host = '0.0.0.0:0'
 
-     @server = GRPC::Core::Server.new(@server_queue, nil)
 
-     server_port = @server.add_http2_port(server_host, :this_port_is_insecure)
 
-     @host = "localhost:#{server_port}"
 
-     @ch = GRPC::Core::Channel.new(@host, nil, :this_channel_is_insecure)
 
-   end
 
-   describe '#new' do
 
-     it 'can be created with just some args' do
 
-       opts = { a_channel_arg: 'an_arg' }
 
-       blk = proc do
 
-         RpcServer.new(**opts)
 
-       end
 
-       expect(&blk).not_to raise_error
 
-     end
 
-     it 'can be created with a default deadline' do
 
-       opts = { a_channel_arg: 'an_arg', deadline: 5 }
 
-       blk = proc do
 
-         RpcServer.new(**opts)
 
-       end
 
-       expect(&blk).not_to raise_error
 
-     end
 
-     it 'can be created with a completion queue override' do
 
-       opts = {
 
-         a_channel_arg: 'an_arg',
 
-         completion_queue_override: @server_queue
 
-       }
 
-       blk = proc do
 
-         RpcServer.new(**opts)
 
-       end
 
-       expect(&blk).not_to raise_error
 
-     end
 
-     it 'cannot be created with a bad completion queue override' do
 
-       blk = proc do
 
-         opts = {
 
-           a_channel_arg: 'an_arg',
 
-           completion_queue_override: Object.new
 
-         }
 
-         RpcServer.new(**opts)
 
-       end
 
-       expect(&blk).to raise_error
 
-     end
 
-     it 'cannot be created with invalid ServerCredentials' do
 
-       blk = proc do
 
-         opts = {
 
-           a_channel_arg: 'an_arg',
 
-           creds: Object.new
 
-         }
 
-         RpcServer.new(**opts)
 
-       end
 
-       expect(&blk).to raise_error
 
-     end
 
-     it 'can be created with a server override' do
 
-       opts = { a_channel_arg: 'an_arg', server_override: @server }
 
-       blk = proc do
 
-         RpcServer.new(**opts)
 
-       end
 
-       expect(&blk).not_to raise_error
 
-     end
 
-     it 'cannot be created with a bad server override' do
 
-       blk = proc do
 
-         opts = {
 
-           a_channel_arg: 'an_arg',
 
-           server_override: Object.new
 
-         }
 
-         RpcServer.new(**opts)
 
-       end
 
-       expect(&blk).to raise_error
 
-     end
 
-   end
 
-   describe '#stopped?' do
 
-     before(:each) do
 
-       opts = { a_channel_arg: 'an_arg', poll_period: 1.5 }
 
-       @srv = RpcServer.new(**opts)
 
-     end
 
-     it 'starts out false' do
 
-       expect(@srv.stopped?).to be(false)
 
-     end
 
-     it 'stays false after the server starts running', server: true do
 
-       @srv.handle(EchoService)
 
-       t = Thread.new { @srv.run }
 
-       @srv.wait_till_running
 
-       expect(@srv.stopped?).to be(false)
 
-       @srv.stop
 
-       t.join
 
-     end
 
-     it 'is true after a running server is stopped', server: true do
 
-       @srv.handle(EchoService)
 
-       t = Thread.new { @srv.run }
 
-       @srv.wait_till_running
 
-       @srv.stop
 
-       t.join
 
-       expect(@srv.stopped?).to be(true)
 
-     end
 
-   end
 
-   describe '#running?' do
 
-     it 'starts out false' do
 
-       opts = { a_channel_arg: 'an_arg', server_override: @server }
 
-       r = RpcServer.new(**opts)
 
-       expect(r.running?).to be(false)
 
-     end
 
-     it 'is false if run is called with no services registered', server: true do
 
-       opts = {
 
-         a_channel_arg: 'an_arg',
 
-         poll_period: 2,
 
-         server_override: @server
 
-       }
 
-       r = RpcServer.new(**opts)
 
-       expect { r.run }.to raise_error(RuntimeError)
 
-     end
 
-     it 'is true after run is called with a registered service' do
 
-       opts = {
 
-         a_channel_arg: 'an_arg',
 
-         poll_period: 2.5,
 
-         server_override: @server
 
-       }
 
-       r = RpcServer.new(**opts)
 
-       r.handle(EchoService)
 
-       t = Thread.new { r.run }
 
-       r.wait_till_running
 
-       expect(r.running?).to be(true)
 
-       r.stop
 
-       t.join
 
-     end
 
-   end
 
-   describe '#handle' do
 
-     before(:each) do
 
-       @opts = { a_channel_arg: 'an_arg', poll_period: 1 }
 
-       @srv = RpcServer.new(**@opts)
 
-     end
 
-     it 'raises if #run has already been called' do
 
-       @srv.handle(EchoService)
 
-       t = Thread.new { @srv.run }
 
-       @srv.wait_till_running
 
-       expect { @srv.handle(EchoService) }.to raise_error
 
-       @srv.stop
 
-       t.join
 
-     end
 
-     it 'raises if the server has been run and stopped' do
 
-       @srv.handle(EchoService)
 
-       t = Thread.new { @srv.run }
 
-       @srv.wait_till_running
 
-       @srv.stop
 
-       t.join
 
-       expect { @srv.handle(EchoService) }.to raise_error
 
-     end
 
-     it 'raises if the service does not include GenericService ' do
 
-       expect { @srv.handle(Object) }.to raise_error
 
-     end
 
-     it 'raises if the service does not declare any rpc methods' do
 
-       expect { @srv.handle(EmptyService) }.to raise_error
 
-     end
 
-     it 'raises if the service does not define its rpc methods' do
 
-       expect { @srv.handle(NoRpcImplementation) }.to raise_error
 
-     end
 
-     it 'raises if a handler method is already registered' do
 
-       @srv.handle(EchoService)
 
-       expect { r.handle(EchoService) }.to raise_error
 
-     end
 
-   end
 
-   describe '#run' do
 
-     let(:client_opts) { { channel_override: @ch } }
 
-     let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc }
 
-     let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) }
 
-     context 'with no connect_metadata' do
 
-       before(:each) do
 
-         server_opts = {
 
-           server_override: @server,
 
-           completion_queue_override: @server_queue,
 
-           poll_period: 1
 
-         }
 
-         @srv = RpcServer.new(**server_opts)
 
-       end
 
-       it 'should return NOT_FOUND status on unknown methods', server: true do
 
-         @srv.handle(EchoService)
 
-         t = Thread.new { @srv.run }
 
-         @srv.wait_till_running
 
-         req = EchoMsg.new
 
-         blk = proc do
 
-           cq = GRPC::Core::CompletionQueue.new
 
-           stub = GRPC::ClientStub.new(@host, cq, :this_channel_is_insecure,
 
-                                       **client_opts)
 
-           stub.request_response('/unknown', req, marshal, unmarshal)
 
-         end
 
-         expect(&blk).to raise_error GRPC::BadStatus
 
-         @srv.stop
 
-         t.join
 
-       end
 
-       it 'should handle multiple sequential requests', server: true do
 
-         @srv.handle(EchoService)
 
-         t = Thread.new { @srv.run }
 
-         @srv.wait_till_running
 
-         req = EchoMsg.new
 
-         n = 5  # arbitrary
 
-         stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
 
-         n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) }
 
-         @srv.stop
 
-         t.join
 
-       end
 
-       it 'should receive metadata sent as rpc keyword args', server: true do
 
-         service = EchoService.new
 
-         @srv.handle(service)
 
-         t = Thread.new { @srv.run }
 
-         @srv.wait_till_running
 
-         req = EchoMsg.new
 
-         stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
 
-         expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
 
-         wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
 
-         check_md(wanted_md, service.received_md)
 
-         @srv.stop
 
-         t.join
 
-       end
 
-       it 'should receive metadata if a deadline is specified', server: true do
 
-         service = SlowService.new
 
-         @srv.handle(service)
 
-         t = Thread.new { @srv.run }
 
-         @srv.wait_till_running
 
-         req = EchoMsg.new
 
-         stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts)
 
-         timeout = service.delay + 1.0 # wait for long enough
 
-         resp = stub.an_rpc(req, timeout: timeout, k1: 'v1', k2: 'v2')
 
-         expect(resp).to be_a(EchoMsg)
 
-         wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
 
-         check_md(wanted_md, service.received_md)
 
-         @srv.stop
 
-         t.join
 
-       end
 
-       it 'should handle cancellation correctly', server: true do
 
-         service = SlowService.new
 
-         @srv.handle(service)
 
-         t = Thread.new { @srv.run }
 
-         @srv.wait_till_running
 
-         req = EchoMsg.new
 
-         stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts)
 
-         op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
 
-         Thread.new do  # cancel the call
 
-           sleep 0.1
 
-           op.cancel
 
-         end
 
-         expect { op.execute }.to raise_error GRPC::Cancelled
 
-         @srv.stop
 
-         t.join
 
-       end
 
-       it 'should handle multiple parallel requests', server: true do
 
-         @srv.handle(EchoService)
 
-         t = Thread.new { @srv.run }
 
-         @srv.wait_till_running
 
-         req, q = EchoMsg.new, Queue.new
 
-         n = 5  # arbitrary
 
-         threads = [t]
 
-         n.times do
 
-           threads << Thread.new do
 
-             stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
 
-             q << stub.an_rpc(req)
 
-           end
 
-         end
 
-         n.times { expect(q.pop).to be_a(EchoMsg) }
 
-         @srv.stop
 
-         threads.each(&:join)
 
-       end
 
-       it 'should return UNAVAILABLE on too many jobs', server: true do
 
-         opts = {
 
-           a_channel_arg: 'an_arg',
 
-           server_override: @server,
 
-           completion_queue_override: @server_queue,
 
-           pool_size: 1,
 
-           poll_period: 1,
 
-           max_waiting_requests: 0
 
-         }
 
-         alt_srv = RpcServer.new(**opts)
 
-         alt_srv.handle(SlowService)
 
-         t = Thread.new { alt_srv.run }
 
-         alt_srv.wait_till_running
 
-         req = EchoMsg.new
 
-         n = 5  # arbitrary, use as many to ensure the server pool is exceeded
 
-         threads = []
 
-         one_failed_as_unavailable = false
 
-         n.times do
 
-           threads << Thread.new do
 
-             stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts)
 
-             begin
 
-               stub.an_rpc(req)
 
-             rescue GRPC::BadStatus => e
 
-               one_failed_as_unavailable = e.code == StatusCodes::UNAVAILABLE
 
-             end
 
-           end
 
-         end
 
-         threads.each(&:join)
 
-         alt_srv.stop
 
-         t.join
 
-         expect(one_failed_as_unavailable).to be(true)
 
-       end
 
-     end
 
-     context 'with connect metadata' do
 
-       let(:test_md_proc) do
 
-         proc do |mth, md|
 
-           res = md.clone
 
-           res['method'] = mth
 
-           res['connect_k1'] = 'connect_v1'
 
-           res
 
-         end
 
-       end
 
-       before(:each) do
 
-         server_opts = {
 
-           server_override: @server,
 
-           completion_queue_override: @server_queue,
 
-           poll_period: 1,
 
-           connect_md_proc: test_md_proc
 
-         }
 
-         @srv = RpcServer.new(**server_opts)
 
-       end
 
-       it 'should send connect metadata to the client', server: true do
 
-         service = EchoService.new
 
-         @srv.handle(service)
 
-         t = Thread.new { @srv.run }
 
-         @srv.wait_till_running
 
-         req = EchoMsg.new
 
-         stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
 
-         op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
 
-         expect(op.metadata).to be nil
 
-         expect(op.execute).to be_a(EchoMsg)
 
-         wanted_md = {
 
-           'k1' => 'v1',
 
-           'k2' => 'v2',
 
-           'method' => '/EchoService/an_rpc',
 
-           'connect_k1' => 'connect_v1'
 
-         }
 
-         wanted_md.each do |key, value|
 
-           expect(op.metadata[key]).to eq(value)
 
-         end
 
-         @srv.stop
 
-         t.join
 
-       end
 
-     end
 
-     context 'with trailing metadata' do
 
-       before(:each) do
 
-         server_opts = {
 
-           server_override: @server,
 
-           completion_queue_override: @server_queue,
 
-           poll_period: 1
 
-         }
 
-         @srv = RpcServer.new(**server_opts)
 
-       end
 
-       it 'should be added to BadStatus when requests fail', server: true do
 
-         service = FailingService.new
 
-         @srv.handle(service)
 
-         t = Thread.new { @srv.run }
 
-         @srv.wait_till_running
 
-         req = EchoMsg.new
 
-         stub = FailingStub.new(@host, :this_channel_is_insecure, **client_opts)
 
-         blk = proc { stub.an_rpc(req) }
 
-         # confirm it raise the expected error
 
-         expect(&blk).to raise_error GRPC::BadStatus
 
-         # call again and confirm exception contained the trailing metadata.
 
-         begin
 
-           blk.call
 
-         rescue GRPC::BadStatus => e
 
-           expect(e.code).to eq(service.code)
 
-           expect(e.details).to eq(service.details)
 
-           expect(e.metadata).to eq(service.md)
 
-         end
 
-         @srv.stop
 
-         t.join
 
-       end
 
-       it 'should be received by the client', server: true do
 
-         wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' }
 
-         service = EchoService.new(k1: 'out_v1', k2: 'out_v2')
 
-         @srv.handle(service)
 
-         t = Thread.new { @srv.run }
 
-         @srv.wait_till_running
 
-         req = EchoMsg.new
 
-         stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
 
-         op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
 
-         expect(op.metadata).to be nil
 
-         expect(op.execute).to be_a(EchoMsg)
 
-         expect(op.metadata).to eq(wanted_trailers)
 
-         @srv.stop
 
-         t.join
 
-       end
 
-     end
 
-   end
 
- end
 
 
  |