| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934 | # Copyright 2015 gRPC authors.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at##     http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.require 'grpc'Thread.abort_on_exception = truedef wakey_thread(&blk)  n = GRPC::Notifier.new  t = Thread.new do    blk.call(n)  end  t.abort_on_exception = true  n.wait  tenddef load_test_certs  test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')  files = ['ca.pem', 'server1.key', 'server1.pem']  files.map { |f| File.open(File.join(test_root, f)).read }endinclude GRPC::Core::StatusCodesinclude GRPC::Core::TimeConstsinclude GRPC::Core::CallOps# check that methods on a finished/closed call t crashdef check_op_view_of_finished_client_call(op_view,                                          expected_metadata,                                          expected_trailing_metadata)  # use read_response_stream to try to iterate through  # possible response stream  fail('need something to attempt reads') unless block_given?  expect do    resp = op_view.execute    yield resp  end.to raise_error(GRPC::Core::CallError)  expect { op_view.start_call }.to raise_error(RuntimeError)  sanity_check_values_of_accessors(op_view,                                   expected_metadata,                                   expected_trailing_metadata)  expect do    op_view.wait    op_view.cancel    op_view.write_flag = 1  end.to_not raise_errorenddef sanity_check_values_of_accessors(op_view,                                     expected_metadata,                                     expected_trailing_metadata)  expected_status = Struct::Status.new  expected_status.code = 0  expected_status.details = 'OK'  expected_status.metadata = expected_trailing_metadata  expect(op_view.status).to eq(expected_status)  expect(op_view.metadata).to eq(expected_metadata)  expect(op_view.trailing_metadata).to eq(expected_trailing_metadata)  expect(op_view.cancelled?).to be(false)  expect(op_view.write_flag).to be(nil)  # The deadline attribute of a call can be either  # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive.  # TODO: fix so that the accessor always returns the same type.  expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) ||         op_view.deadline.is_a?(Time)).to be(true)enddef close_active_server_call(active_server_call)  active_server_call.send(:set_input_stream_done)  active_server_call.send(:set_output_stream_done)enddescribe 'ClientStub' do  # rubocop:disable Metrics/BlockLength  let(:noop) { proc { |x| x } }  before(:each) do    Thread.abort_on_exception = true    @server = nil    @method = 'an_rpc_method'    @pass = OK    @fail = INTERNAL    @metadata = { k1: 'v1', k2: 'v2' }  end  after(:each) do    unless @server.nil?      @server.shutdown_and_notify(from_relative_time(2))      @server.close    end  end  describe '#new' do    let(:fake_host) { 'localhost:0' }    it 'can be created from a host and args' do      opts = { channel_args: { a_channel_arg: 'an_arg' } }      blk = proc do        GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)      end      expect(&blk).not_to raise_error    end    it 'can be created with an channel override' do      opts = {        channel_args: { a_channel_arg: 'an_arg' },        channel_override: @ch      }      blk = proc do        GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)      end      expect(&blk).not_to raise_error    end    it 'cannot be created with a bad channel override' do      blk = proc do        opts = {          channel_args: { a_channel_arg: 'an_arg' },          channel_override: Object.new        }        GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)      end      expect(&blk).to raise_error    end    it 'cannot be created with bad credentials' do      blk = proc do        opts = { channel_args: { a_channel_arg: 'an_arg' } }        GRPC::ClientStub.new(fake_host, Object.new, **opts)      end      expect(&blk).to raise_error    end    it 'can be created with test test credentials' do      certs = load_test_certs      blk = proc do        opts = {          channel_args: {            GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr',            a_channel_arg: 'an_arg'          }        }        creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil)        GRPC::ClientStub.new(fake_host, creds,  **opts)      end      expect(&blk).to_not raise_error    end  end  describe '#request_response', request_response: true do    before(:each) do      @sent_msg, @resp = 'a_msg', 'a_reply'    end    shared_examples 'request response' do      it 'should send a request to/receive a reply from a server' do        server_port = create_test_server        th = run_request_response(@sent_msg, @resp, @pass)        stub = GRPC::ClientStub.new("localhost:#{server_port}",                                    :this_channel_is_insecure)        expect(get_response(stub)).to eq(@resp)        th.join      end      def metadata_test(md)        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_request_response(@sent_msg, @resp, @pass,                                  expected_metadata: md)        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)        @metadata = md        expect(get_response(stub)).to eq(@resp)        th.join      end      it 'should send metadata to the server ok' do        metadata_test(k1: 'v1', k2: 'v2')      end      # these tests mostly try to exercise when md might be allocated      # instead of inlined      it 'should send metadata with multiple large md to the server ok' do        val_array = %w(          '00000000000000000000000000000000000000000000000000000000000000',          '11111111111111111111111111111111111111111111111111111111111111',          '22222222222222222222222222222222222222222222222222222222222222',        )        md = {          k1: val_array,          k2: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',          k3: 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',          k4: 'cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc',          keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey5: 'v5',          'k66666666666666666666666666666666666666666666666666666' => 'v6',          'k77777777777777777777777777777777777777777777777777777' => 'v7',          'k88888888888888888888888888888888888888888888888888888' => 'v8'        }        metadata_test(md)      end      it 'should send a request when configured using an override channel' do        server_port = create_test_server        alt_host = "localhost:#{server_port}"        th = run_request_response(@sent_msg, @resp, @pass)        ch = GRPC::Core::Channel.new(alt_host, nil, :this_channel_is_insecure)        stub = GRPC::ClientStub.new('ignored-host',                                    :this_channel_is_insecure,                                    channel_override: ch)        expect(get_response(stub)).to eq(@resp)        th.join      end      it 'should raise an error if the status is not OK' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_request_response(@sent_msg, @resp, @fail)        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)        blk = proc { get_response(stub) }        expect(&blk).to raise_error(GRPC::BadStatus)        th.join      end      it 'should receive UNAVAILABLE if call credentials plugin fails' do        server_port = create_secure_test_server        server_started_notifier = GRPC::Notifier.new        th = Thread.new do          @server.start          server_started_notifier.notify(nil)          # Poll on the server so that the client connection can proceed.          # We don't expect the server to actually accept a call though.          expect { @server.request_call }.to raise_error(GRPC::Core::CallError)        end        server_started_notifier.wait        certs = load_test_certs        secure_channel_creds = GRPC::Core::ChannelCredentials.new(          certs[0], nil, nil)        secure_stub_opts = {          channel_args: {            GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr'          }        }        stub = GRPC::ClientStub.new("localhost:#{server_port}",                                    secure_channel_creds, **secure_stub_opts)        error_message = 'Failing call credentials callback'        failing_auth = proc do          fail error_message        end        creds = GRPC::Core::CallCredentials.new(failing_auth)        unavailable_error_occured = false        begin          get_response(stub, credentials: creds)        rescue GRPC::Unavailable => e          unavailable_error_occured = true          expect(e.details.include?(error_message)).to be true        end        expect(unavailable_error_occured).to eq(true)        @server.shutdown_and_notify(Time.now + 3)        th.join        @server.close      end      it 'should raise ArgumentError if metadata contains invalid values' do        @metadata.merge!(k3: 3)        server_port = create_test_server        host = "localhost:#{server_port}"        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)        expect do          get_response(stub)        end.to raise_error(ArgumentError,                           /Header values must be of type string or array/)      end    end    describe 'without a call operation' do      def get_response(stub, credentials: nil)        puts credentials.inspect        stub.request_response(@method, @sent_msg, noop, noop,                              metadata: @metadata,                              credentials: credentials)      end      it_behaves_like 'request response'    end    describe 'via a call operation' do      after(:each) do        # make sure op.wait doesn't hang, even if there's a bad status        @op.wait      end      def get_response(stub, run_start_call_first: false, credentials: nil)        @op = stub.request_response(@method, @sent_msg, noop, noop,                                    return_op: true,                                    metadata: @metadata,                                    deadline: from_relative_time(2),                                    credentials: credentials)        expect(@op).to be_a(GRPC::ActiveCall::Operation)        @op.start_call if run_start_call_first        result = @op.execute        result      end      it_behaves_like 'request response'      def run_op_view_metadata_test(run_start_call_first)        server_port = create_test_server        host = "localhost:#{server_port}"        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }        th = run_request_response(          @sent_msg, @resp, @pass,          expected_metadata: @metadata,          server_initial_md: @server_initial_md,          server_trailing_md: @server_trailing_md)        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)        expect(          get_response(stub,                       run_start_call_first: run_start_call_first)).to eq(@resp)        th.join      end      it 'sends metadata to the server ok when running start_call first' do        run_op_view_metadata_test(true)        check_op_view_of_finished_client_call(          @op, @server_initial_md, @server_trailing_md) { |r| p r }      end      it 'does not crash when used after the call has been finished' do        run_op_view_metadata_test(false)        check_op_view_of_finished_client_call(          @op, @server_initial_md, @server_trailing_md) { |r| p r }      end    end  end  describe '#client_streamer', client_streamer: true do    before(:each) do      Thread.abort_on_exception = true      server_port = create_test_server      host = "localhost:#{server_port}"      @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)      @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }      @resp = 'a_reply'    end    shared_examples 'client streaming' do      it 'should send requests to/receive a reply from a server' do        th = run_client_streamer(@sent_msgs, @resp, @pass)        expect(get_response(@stub)).to eq(@resp)        th.join      end      it 'should send metadata to the server ok' do        th = run_client_streamer(@sent_msgs, @resp, @pass,                                 expected_metadata: @metadata)        expect(get_response(@stub)).to eq(@resp)        th.join      end      it 'should raise an error if the status is not ok' do        th = run_client_streamer(@sent_msgs, @resp, @fail)        blk = proc { get_response(@stub) }        expect(&blk).to raise_error(GRPC::BadStatus)        th.join      end      it 'should raise ArgumentError if metadata contains invalid values' do        @metadata.merge!(k3: 3)        expect do          get_response(@stub)        end.to raise_error(ArgumentError,                           /Header values must be of type string or array/)      end    end    describe 'without a call operation' do      def get_response(stub)        stub.client_streamer(@method, @sent_msgs, noop, noop,                             metadata: @metadata)      end      it_behaves_like 'client streaming'    end    describe 'via a call operation' do      after(:each) do        # make sure op.wait doesn't hang, even if there's a bad status        @op.wait      end      def get_response(stub, run_start_call_first: false)        @op = stub.client_streamer(@method, @sent_msgs, noop, noop,                                   return_op: true, metadata: @metadata)        expect(@op).to be_a(GRPC::ActiveCall::Operation)        @op.start_call if run_start_call_first        result = @op.execute        result      end      it_behaves_like 'client streaming'      def run_op_view_metadata_test(run_start_call_first)        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }        th = run_client_streamer(          @sent_msgs, @resp, @pass,          expected_metadata: @metadata,          server_initial_md: @server_initial_md,          server_trailing_md: @server_trailing_md)        expect(          get_response(@stub,                       run_start_call_first: run_start_call_first)).to eq(@resp)        th.join      end      it 'sends metadata to the server ok when running start_call first' do        run_op_view_metadata_test(true)        check_op_view_of_finished_client_call(          @op, @server_initial_md, @server_trailing_md) { |r| p r }      end      it 'does not crash when used after the call has been finished' do        run_op_view_metadata_test(false)        check_op_view_of_finished_client_call(          @op, @server_initial_md, @server_trailing_md) { |r| p r }      end    end  end  describe '#server_streamer', server_streamer: true do    before(:each) do      @sent_msg = 'a_msg'      @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }    end    shared_examples 'server streaming' do      it 'should send a request to/receive replies from a server' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_server_streamer(@sent_msg, @replys, @pass)        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)        expect(get_responses(stub).collect { |r| r }).to eq(@replys)        th.join      end      it 'should raise an error if the status is not ok' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_server_streamer(@sent_msg, @replys, @fail)        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)        e = get_responses(stub)        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)        th.join      end      it 'should send metadata to the server ok' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_server_streamer(@sent_msg, @replys, @fail,                                 expected_metadata: { k1: 'v1', k2: 'v2' })        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)        e = get_responses(stub)        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)        th.join      end      it 'should raise ArgumentError if metadata contains invalid values' do        @metadata.merge!(k3: 3)        server_port = create_test_server        host = "localhost:#{server_port}"        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)        expect do          get_responses(stub).collect { |r| r }        end.to raise_error(ArgumentError,                           /Header values must be of type string or array/)      end      def run_server_streamer_against_client_with_unmarshal_error(        expected_input, replys)        wakey_thread do |notifier|          c = expect_server_to_be_invoked(notifier)          expect(c.remote_read).to eq(expected_input)          begin            replys.each { |r| c.remote_send(r) }          rescue GRPC::Core::CallError            # An attempt to write to the client might fail. This is ok            # because the client call is expected to fail when            # unmarshalling the first response, and to cancel the call,            # and there is a race as for when the server-side call will            # start to fail.            p 'remote_send failed (allowed because call expected to cancel)'          ensure            c.send_status(OK, 'OK', true)            close_active_server_call(c)          end        end      end      it 'the call terminates when there is an unmarshalling error' do        server_port = create_test_server        host = "localhost:#{server_port}"        th = run_server_streamer_against_client_with_unmarshal_error(          @sent_msg, @replys)        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)        unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') }        expect do          get_responses(stub, unmarshal: unmarshal).collect { |r| r }        end.to raise_error(ArgumentError, 'test unmarshalling error')        th.join      end    end    describe 'without a call operation' do      def get_responses(stub, unmarshal: noop)        e = stub.server_streamer(@method, @sent_msg, noop, unmarshal,                                 metadata: @metadata)        expect(e).to be_a(Enumerator)        e      end      it_behaves_like 'server streaming'    end    describe 'via a call operation' do      after(:each) do        @op.wait # make sure wait doesn't hang      end      def get_responses(stub, run_start_call_first: false, unmarshal: noop)        @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal,                                   return_op: true,                                   metadata: @metadata)        expect(@op).to be_a(GRPC::ActiveCall::Operation)        @op.start_call if run_start_call_first        e = @op.execute        expect(e).to be_a(Enumerator)        e      end      it_behaves_like 'server streaming'      def run_op_view_metadata_test(run_start_call_first)        server_port = create_test_server        host = "localhost:#{server_port}"        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }        th = run_server_streamer(          @sent_msg, @replys, @pass,          expected_metadata: @metadata,          server_initial_md: @server_initial_md,          server_trailing_md: @server_trailing_md)        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)        e = get_responses(stub, run_start_call_first: run_start_call_first)        expect(e.collect { |r| r }).to eq(@replys)        th.join      end      it 'should send metadata to the server ok when start_call is run first' do        run_op_view_metadata_test(true)        check_op_view_of_finished_client_call(          @op, @server_initial_md, @server_trailing_md) do |responses|          responses.each { |r| p r }        end      end      it 'does not crash when used after the call has been finished' do        run_op_view_metadata_test(false)        check_op_view_of_finished_client_call(          @op, @server_initial_md, @server_trailing_md) do |responses|          responses.each { |r| p r }        end      end    end  end  describe '#bidi_streamer', bidi: true do    before(:each) do      @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }      @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }      server_port = create_test_server      @host = "localhost:#{server_port}"    end    shared_examples 'bidi streaming' do      it 'supports sending all the requests first' do        th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,                                                   @pass)        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)        e = get_responses(stub)        expect(e.collect { |r| r }).to eq(@replys)        th.join      end      it 'supports client-initiated ping pong' do        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)        e = get_responses(stub)        expect(e.collect { |r| r }).to eq(@sent_msgs)        th.join      end      it 'supports a server-initiated ping pong' do        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)        e = get_responses(stub)        expect(e.collect { |r| r }).to eq(@sent_msgs)        th.join      end      it 'should raise an error if the status is not ok' do        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false)        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)        e = get_responses(stub)        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)        th.join      end      it 'should raise ArgumentError if metadata contains invalid values' do        @metadata.merge!(k3: 3)        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)        expect do          get_responses(stub).collect { |r| r }        end.to raise_error(ArgumentError,                           /Header values must be of type string or array/)      end      it 'terminates if the call fails to start' do        # don't start the server        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)        expect do          get_responses(stub, deadline: from_relative_time(0)).collect { |r| r }        end.to raise_error(GRPC::BadStatus)      end      it 'should send metadata to the server ok' do        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,                                              expected_metadata: @metadata)        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)        e = get_responses(stub)        expect(e.collect { |r| r }).to eq(@sent_msgs)        th.join      end      # Prompted by grpc/github #10526      describe 'surfacing of errors when sending requests' do        def run_server_bidi_send_one_then_read_indefinitely          @server.start          recvd_rpc = @server.request_call          recvd_call = recvd_rpc.call          server_call = GRPC::ActiveCall.new(            recvd_call, noop, noop, INFINITE_FUTURE,            metadata_received: true, started: false)          server_call.send_initial_metadata          server_call.remote_send('server response')          loop do            m = server_call.remote_read            break if m.nil?          end          # can't fail since initial metadata already sent          server_call.send_status(@pass, 'OK', true)          close_active_server_call(server_call)        end        def verify_error_from_write_thread(stub, requests_to_push,                                           request_queue, expected_description)          # TODO: an improvement might be to raise the original exception from          # bidi call write loops instead of only cancelling the call          failing_marshal_proc = proc do |req|            fail req if req.is_a?(StandardError)            req          end          begin            e = get_responses(stub, marshal_proc: failing_marshal_proc)            first_response = e.next            expect(first_response).to eq('server response')            requests_to_push.each { |req| request_queue.push(req) }            e.collect { |r| r }          rescue GRPC::Unknown => e            exception = e          end          expect(exception.message.include?(expected_description)).to be(true)        end        # Provides an Enumerable view of a Queue        class BidiErrorTestingEnumerateForeverQueue          def initialize(queue)            @queue = queue          end          def each            loop do              msg = @queue.pop              yield msg            end          end        end        def run_error_in_client_request_stream_test(requests_to_push,                                                    expected_error_message)          # start a server that waits on a read indefinitely - it should          # see a cancellation and be able to break out          th = Thread.new { run_server_bidi_send_one_then_read_indefinitely }          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)          request_queue = Queue.new          @sent_msgs = BidiErrorTestingEnumerateForeverQueue.new(request_queue)          verify_error_from_write_thread(stub,                                         requests_to_push,                                         request_queue,                                         expected_error_message)          # the write loop errror should cancel the call and end the          # server's request stream          th.join        end        it 'non-GRPC errors from the write loop surface when raised ' \          'at the start of a request stream' do          expected_error_message = 'expect error on first request'          requests_to_push = [StandardError.new(expected_error_message)]          run_error_in_client_request_stream_test(requests_to_push,                                                  expected_error_message)        end        it 'non-GRPC errors from the write loop surface when raised ' \          'during the middle of a request stream' do          expected_error_message = 'expect error on last request'          requests_to_push = %w( one two )          requests_to_push << StandardError.new(expected_error_message)          run_error_in_client_request_stream_test(requests_to_push,                                                  expected_error_message)        end      end    end    describe 'without a call operation' do      def get_responses(stub, deadline: nil, marshal_proc: noop)        e = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop,                               metadata: @metadata, deadline: deadline)        expect(e).to be_a(Enumerator)        e      end      it_behaves_like 'bidi streaming'    end    describe 'via a call operation' do      after(:each) do        @op.wait # make sure wait doesn't hang      end      def get_responses(stub, run_start_call_first: false, deadline: nil,                        marshal_proc: noop)        @op = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop,                                 return_op: true,                                 metadata: @metadata, deadline: deadline)        expect(@op).to be_a(GRPC::ActiveCall::Operation)        @op.start_call if run_start_call_first        e = @op.execute        expect(e).to be_a(Enumerator)        e      end      it_behaves_like 'bidi streaming'      def run_op_view_metadata_test(run_start_call_first)        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }        th = run_bidi_streamer_echo_ping_pong(          @sent_msgs, @pass, true,          expected_metadata: @metadata,          server_initial_md: @server_initial_md,          server_trailing_md: @server_trailing_md)        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)        e = get_responses(stub, run_start_call_first: run_start_call_first)        expect(e.collect { |r| r }).to eq(@sent_msgs)        th.join      end      it 'can run start_call before executing the call' do        run_op_view_metadata_test(true)        check_op_view_of_finished_client_call(          @op, @server_initial_md, @server_trailing_md) do |responses|          responses.each { |r| p r }        end      end      it 'doesnt crash when op_view used after call has finished' do        run_op_view_metadata_test(false)        check_op_view_of_finished_client_call(          @op, @server_initial_md, @server_trailing_md) do |responses|          responses.each { |r| p r }        end      end    end  end  def run_server_streamer(expected_input, replys, status,                          expected_metadata: {},                          server_initial_md: {},                          server_trailing_md: {})    wanted_metadata = expected_metadata.clone    wakey_thread do |notifier|      c = expect_server_to_be_invoked(        notifier, metadata_to_send: server_initial_md)      wanted_metadata.each do |k, v|        expect(c.metadata[k.to_s]).to eq(v)      end      expect(c.remote_read).to eq(expected_input)      replys.each { |r| c.remote_send(r) }      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,                    metadata: server_trailing_md)      close_active_server_call(c)    end  end  def run_bidi_streamer_handle_inputs_first(expected_inputs, replys,                                            status)    wakey_thread do |notifier|      c = expect_server_to_be_invoked(notifier)      expected_inputs.each { |i| expect(c.remote_read).to eq(i) }      replys.each { |r| c.remote_send(r) }      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)      close_active_server_call(c)    end  end  def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts,                                       expected_metadata: {},                                       server_initial_md: {},                                       server_trailing_md: {})    wanted_metadata = expected_metadata.clone    wakey_thread do |notifier|      c = expect_server_to_be_invoked(        notifier, metadata_to_send: server_initial_md)      wanted_metadata.each do |k, v|        expect(c.metadata[k.to_s]).to eq(v)      end      expected_inputs.each do |i|        if client_starts          expect(c.remote_read).to eq(i)          c.remote_send(i)        else          c.remote_send(i)          expect(c.remote_read).to eq(i)        end      end      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,                    metadata: server_trailing_md)      close_active_server_call(c)    end  end  def run_client_streamer(expected_inputs, resp, status,                          expected_metadata: {},                          server_initial_md: {},                          server_trailing_md: {})    wanted_metadata = expected_metadata.clone    wakey_thread do |notifier|      c = expect_server_to_be_invoked(        notifier, metadata_to_send: server_initial_md)      expected_inputs.each { |i| expect(c.remote_read).to eq(i) }      wanted_metadata.each do |k, v|        expect(c.metadata[k.to_s]).to eq(v)      end      c.remote_send(resp)      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,                    metadata: server_trailing_md)      close_active_server_call(c)    end  end  def run_request_response(expected_input, resp, status,                           expected_metadata: {},                           server_initial_md: {},                           server_trailing_md: {})    wanted_metadata = expected_metadata.clone    wakey_thread do |notifier|      c = expect_server_to_be_invoked(        notifier, metadata_to_send: server_initial_md)      expect(c.remote_read).to eq(expected_input)      wanted_metadata.each do |k, v|        expect(c.metadata[k.to_s]).to eq(v)      end      c.remote_send(resp)      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,                    metadata: server_trailing_md)      close_active_server_call(c)    end  end  def create_secure_test_server    certs = load_test_certs    secure_credentials = GRPC::Core::ServerCredentials.new(      nil, [{ private_key: certs[1], cert_chain: certs[2] }], false)    @server = new_core_server_for_testing(nil)    @server.add_http2_port('0.0.0.0:0', secure_credentials)  end  def create_test_server    @server = new_core_server_for_testing(nil)    @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)  end  def expect_server_to_be_invoked(notifier, metadata_to_send: nil)    @server.start    notifier.notify(nil)    recvd_rpc = @server.request_call    recvd_call = recvd_rpc.call    recvd_call.metadata = recvd_rpc.metadata    recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send)    GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,                         metadata_received: true)  endend
 |