| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496 | # Copyright 2015, Google Inc.# All rights reserved.## Redistribution and use in source and binary forms, with or without# modification, are permitted provided that the following conditions are# met:##     * Redistributions of source code must retain the above copyright# notice, this list of conditions and the following disclaimer.#     * Redistributions in binary form must reproduce the above# copyright notice, this list of conditions and the following disclaimer# in the documentation and/or other materials provided with the# distribution.#     * Neither the name of Google Inc. nor the names of its# contributors may be used to endorse or promote products derived from# this software without specific prior written permission.## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.require 'grpc'def wakey_thread(&blk)  n = GRPC::Notifier.new  t = Thread.new do    blk.call(n)  end  n.wait  tenddef load_test_certs  test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')  files = ['ca.pem', 'server1.key', 'server1.pem']  files.map { |f| File.open(File.join(test_root, f)).read }endinclude GRPC::Core::StatusCodesinclude GRPC::Core::TimeConstsinclude GRPC::Core::CallOpsdescribe 'ClientStub' do  let(:noop) { proc { |x| x } }  before(:each) do    Thread.abort_on_exception = true    @server = nil    @server_queue = nil    @method = 'an_rpc_method'    @pass = OK    @fail = INTERNAL    @cq = GRPC::Core::CompletionQueue.new  end  after(:each) do    @server.close(@server_queue) unless @server_queue.nil?  end  describe '#new' do    let(:fake_host) { 'localhost:0' }    it 'can be created from a host and args' do      opts = { a_channel_arg: 'an_arg' }      blk = proc do        GRPC::ClientStub.new(fake_host, @cq, **opts)      end      expect(&blk).not_to raise_error    end    it 'can be created with a default deadline' do      opts = { a_channel_arg: 'an_arg', deadline: 5 }      blk = proc do        GRPC::ClientStub.new(fake_host, @cq, **opts)      end      expect(&blk).not_to raise_error    end    it 'can be created with an channel override' do      opts = { a_channel_arg: 'an_arg', channel_override: @ch }      blk = proc do        GRPC::ClientStub.new(fake_host, @cq, **opts)      end      expect(&blk).not_to raise_error    end    it 'cannot be created with a bad channel override' do      blk = proc do        opts = { a_channel_arg: 'an_arg', channel_override: Object.new }        GRPC::ClientStub.new(fake_host, @cq, **opts)      end      expect(&blk).to raise_error    end    it 'cannot be created with bad credentials' do      blk = proc do        opts = { a_channel_arg: 'an_arg', creds: Object.new }        GRPC::ClientStub.new(fake_host, @cq, **opts)      end      expect(&blk).to raise_error    end    it 'can be created with test test credentials' do      certs = load_test_certs      blk = proc do        opts = {          GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr',          a_channel_arg: 'an_arg',          creds: GRPC::Core::Credentials.new(certs[0], nil, nil)        }        GRPC::ClientStub.new(fake_host, @cq, **opts)      end      expect(&blk).to_not raise_error    end  end  describe '#request_response' do    before(:each) do      @sent_msg, @resp = 'a_msg', 'a_reply'    end    shared_examples 'request response' do      it 'should send a request to/receive a reply from a server' do        server_port = create_test_server        th = run_request_response(@sent_msg, @resp, @pass)        stub = GRPC::ClientStub.new("localhost:#{server_port}", @cq)        expect(get_response(stub)).to eq(@resp)        th.join      end      it 'should send metadata to the server ok' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_request_response(@sent_msg, @resp, @pass,                                  k1: 'v1', k2: 'v2')        stub = GRPC::ClientStub.new(host, @cq)        expect(get_response(stub)).to eq(@resp)        th.join      end      it 'should update the sent metadata with a provided metadata updater' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_request_response(@sent_msg, @resp, @pass,                                  k1: 'updated-v1', k2: 'v2')        update_md = proc do |md|          md[:k1] = 'updated-v1'          md        end        stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md)        expect(get_response(stub)).to eq(@resp)        th.join      end      it 'should send a request when configured using an override channel' do        server_port = create_test_server        alt_host = "localhost:#{server_port}"        th = run_request_response(@sent_msg, @resp, @pass)        ch = GRPC::Core::Channel.new(alt_host, nil)        stub = GRPC::ClientStub.new('ignored-host', @cq, channel_override: ch)        expect(get_response(stub)).to eq(@resp)        th.join      end      it 'should raise an error if the status is not OK' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_request_response(@sent_msg, @resp, @fail)        stub = GRPC::ClientStub.new(host, @cq)        blk = proc { get_response(stub) }        expect(&blk).to raise_error(GRPC::BadStatus)        th.join      end    end    describe 'without a call operation' do      def get_response(stub)        stub.request_response(@method, @sent_msg, noop, noop,                              k1: 'v1', k2: 'v2')      end      it_behaves_like 'request response'    end    describe 'via a call operation' do      def get_response(stub)        op = stub.request_response(@method, @sent_msg, noop, noop,                                   return_op: true, k1: 'v1', k2: 'v2')        expect(op).to be_a(GRPC::ActiveCall::Operation)        op.execute      end      it_behaves_like 'request response'    end  end  describe '#client_streamer' do    shared_examples 'client streaming' do      before(:each) do        @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }        @resp = 'a_reply'      end      it 'should send requests to/receive a reply from a server' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_client_streamer(@sent_msgs, @resp, @pass)        stub = GRPC::ClientStub.new(host, @cq)        expect(get_response(stub)).to eq(@resp)        th.join      end      it 'should send metadata to the server ok' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_client_streamer(@sent_msgs, @resp, @pass,                                 k1: 'v1', k2: 'v2')        stub = GRPC::ClientStub.new(host, @cq)        expect(get_response(stub)).to eq(@resp)        th.join      end      it 'should update the sent metadata with a provided metadata updater' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_client_streamer(@sent_msgs, @resp, @pass,                                 k1: 'updated-v1', k2: 'v2')        update_md = proc do |md|          md[:k1] = 'updated-v1'          md        end        stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md)        expect(get_response(stub)).to eq(@resp)        th.join      end      it 'should raise an error if the status is not ok' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_client_streamer(@sent_msgs, @resp, @fail)        stub = GRPC::ClientStub.new(host, @cq)        blk = proc { get_response(stub) }        expect(&blk).to raise_error(GRPC::BadStatus)        th.join      end    end    describe 'without a call operation' do      def get_response(stub)        stub.client_streamer(@method, @sent_msgs, noop, noop,                             k1: 'v1', k2: 'v2')      end      it_behaves_like 'client streaming'    end    describe 'via a call operation' do      def get_response(stub)        op = stub.client_streamer(@method, @sent_msgs, noop, noop,                                  return_op: true, k1: 'v1', k2: 'v2')        expect(op).to be_a(GRPC::ActiveCall::Operation)        op.execute      end      it_behaves_like 'client streaming'    end  end  describe '#server_streamer' do    shared_examples 'server streaming' do      before(:each) do        @sent_msg = 'a_msg'        @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }      end      it 'should send a request to/receive replies from a server' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_server_streamer(@sent_msg, @replys, @pass)        stub = GRPC::ClientStub.new(host, @cq)        expect(get_responses(stub).collect { |r| r }).to eq(@replys)        th.join      end      it 'should raise an error if the status is not ok' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_server_streamer(@sent_msg, @replys, @fail)        stub = GRPC::ClientStub.new(host, @cq)        e = get_responses(stub)        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)        th.join      end      it 'should send metadata to the server ok' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_server_streamer(@sent_msg, @replys, @fail,                                 k1: 'v1', k2: 'v2')        stub = GRPC::ClientStub.new(host, @cq)        e = get_responses(stub)        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)        th.join      end      it 'should update the sent metadata with a provided metadata updater' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_server_streamer(@sent_msg, @replys, @pass,                                 k1: 'updated-v1', k2: 'v2')        update_md = proc do |md|          md[:k1] = 'updated-v1'          md        end        stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md)        e = get_responses(stub)        expect(e.collect { |r| r }).to eq(@replys)        th.join      end    end    describe 'without a call operation' do      def get_responses(stub)        e = stub.server_streamer(@method, @sent_msg, noop, noop,                                 k1: 'v1', k2: 'v2')        expect(e).to be_a(Enumerator)        e      end      it_behaves_like 'server streaming'    end    describe 'via a call operation' do      def get_responses(stub)        op = stub.server_streamer(@method, @sent_msg, noop, noop,                                  return_op: true, k1: 'v1', k2: 'v2')        expect(op).to be_a(GRPC::ActiveCall::Operation)        e = op.execute        expect(e).to be_a(Enumerator)        e      end      it_behaves_like 'server streaming'    end  end  describe '#bidi_streamer' do    shared_examples 'bidi streaming' do      before(:each) do        @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }        @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }        server_port = create_test_server        @host = "localhost:#{server_port}"      end      it 'supports sending all the requests first', bidi: true do        th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,                                                   @pass)        stub = GRPC::ClientStub.new(@host, @cq)        e = get_responses(stub)        expect(e.collect { |r| r }).to eq(@replys)        th.join      end      it 'supports client-initiated ping pong', bidi: true do        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)        stub = GRPC::ClientStub.new(@host, @cq)        e = get_responses(stub)        expect(e.collect { |r| r }).to eq(@sent_msgs)        th.join      end      it 'supports a server-initiated ping pong', bidi: true do        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)        stub = GRPC::ClientStub.new(@host, @cq)        e = get_responses(stub)        expect(e.collect { |r| r }).to eq(@sent_msgs)        th.join      end    end    describe 'without a call operation' do      def get_responses(stub)        e = stub.bidi_streamer(@method, @sent_msgs, noop, noop)        expect(e).to be_a(Enumerator)        e      end      it_behaves_like 'bidi streaming'    end    describe 'via a call operation' do      def get_responses(stub)        op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,                                return_op: true)        expect(op).to be_a(GRPC::ActiveCall::Operation)        e = op.execute        expect(e).to be_a(Enumerator)        e      end      it_behaves_like 'bidi streaming'    end  end  def run_server_streamer(expected_input, replys, status, **kw)    wanted_metadata = kw.clone    wakey_thread do |notifier|      c = expect_server_to_be_invoked(notifier)      wanted_metadata.each do |k, v|        expect(c.metadata[k.to_s]).to eq(v)      end      expect(c.remote_read).to eq(expected_input)      replys.each { |r| c.remote_send(r) }      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)    end  end  def run_bidi_streamer_handle_inputs_first(expected_inputs, replys,                                            status)    wakey_thread do |notifier|      c = expect_server_to_be_invoked(notifier)      expected_inputs.each { |i| expect(c.remote_read).to eq(i) }      replys.each { |r| c.remote_send(r) }      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)    end  end  def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts)    wakey_thread do |notifier|      c = expect_server_to_be_invoked(notifier)      expected_inputs.each do |i|        if client_starts          expect(c.remote_read).to eq(i)          c.remote_send(i)        else          c.remote_send(i)          expect(c.remote_read).to eq(i)        end      end      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)    end  end  def run_client_streamer(expected_inputs, resp, status, **kw)    wanted_metadata = kw.clone    wakey_thread do |notifier|      c = expect_server_to_be_invoked(notifier)      expected_inputs.each { |i| expect(c.remote_read).to eq(i) }      wanted_metadata.each do |k, v|        expect(c.metadata[k.to_s]).to eq(v)      end      c.remote_send(resp)      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)    end  end  def run_request_response(expected_input, resp, status, **kw)    wanted_metadata = kw.clone    wakey_thread do |notifier|      c = expect_server_to_be_invoked(notifier)      expect(c.remote_read).to eq(expected_input)      wanted_metadata.each do |k, v|        expect(c.metadata[k.to_s]).to eq(v)      end      c.remote_send(resp)      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)    end  end  def create_test_server    @server_queue = GRPC::Core::CompletionQueue.new    @server = GRPC::Core::Server.new(@server_queue, nil)    @server.add_http2_port('0.0.0.0:0')  end  def expect_server_to_be_invoked(notifier)    @server.start    notifier.notify(nil)    server_tag = Object.new    recvd_rpc = @server.request_call(@server_queue, server_tag,                                     INFINITE_FUTURE)    recvd_call = recvd_rpc.call    recvd_call.metadata = recvd_rpc.metadata    recvd_call.run_batch(@server_queue, server_tag, Time.now + 2,                         SEND_INITIAL_METADATA => nil)    GRPC::ActiveCall.new(recvd_call, @server_queue, noop, noop, INFINITE_FUTURE)  endend
 |