|  | @@ -750,6 +750,90 @@ describe 'ClientStub' do  # rubocop:disable Metrics/BlockLength
 | 
	
		
			
				|  |  |                                                    expected_error_message)
 | 
	
		
			
				|  |  |          end
 | 
	
		
			
				|  |  |        end
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      # Prompted by grpc/github #14853
 | 
	
		
			
				|  |  | +      describe 'client-side error handling on bidi streams' do
 | 
	
		
			
				|  |  | +        class EnumeratorQueue
 | 
	
		
			
				|  |  | +          def initialize(queue)
 | 
	
		
			
				|  |  | +            @queue = queue
 | 
	
		
			
				|  |  | +          end
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +          def each
 | 
	
		
			
				|  |  | +            loop do
 | 
	
		
			
				|  |  | +              msg = @queue.pop
 | 
	
		
			
				|  |  | +              break if msg.nil?
 | 
	
		
			
				|  |  | +              yield msg
 | 
	
		
			
				|  |  | +            end
 | 
	
		
			
				|  |  | +          end
 | 
	
		
			
				|  |  | +        end
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        def run_server_bidi_shutdown_after_one_read
 | 
	
		
			
				|  |  | +          @server.start
 | 
	
		
			
				|  |  | +          recvd_rpc = @server.request_call
 | 
	
		
			
				|  |  | +          recvd_call = recvd_rpc.call
 | 
	
		
			
				|  |  | +          server_call = GRPC::ActiveCall.new(
 | 
	
		
			
				|  |  | +            recvd_call, noop, noop, INFINITE_FUTURE,
 | 
	
		
			
				|  |  | +            metadata_received: true, started: false)
 | 
	
		
			
				|  |  | +          expect(server_call.remote_read).to eq('first message')
 | 
	
		
			
				|  |  | +          @server.shutdown_and_notify(from_relative_time(0))
 | 
	
		
			
				|  |  | +          @server.close
 | 
	
		
			
				|  |  | +        end
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        it 'receives a grpc status code when writes to a bidi stream fail' do
 | 
	
		
			
				|  |  | +          # This test tries to trigger the case when a 'SEND_MESSAGE' op
 | 
	
		
			
				|  |  | +          # and subseqeunt 'SEND_CLOSE_FROM_CLIENT' op of a bidi stream fails.
 | 
	
		
			
				|  |  | +          # In this case, iteration through the response stream should result
 | 
	
		
			
				|  |  | +          # in a grpc status code, and the writer thread should not raise an
 | 
	
		
			
				|  |  | +          # exception.
 | 
	
		
			
				|  |  | +          server_thread = Thread.new do
 | 
	
		
			
				|  |  | +            run_server_bidi_shutdown_after_one_read
 | 
	
		
			
				|  |  | +          end
 | 
	
		
			
				|  |  | +          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
 | 
	
		
			
				|  |  | +          request_queue = Queue.new
 | 
	
		
			
				|  |  | +          @sent_msgs = EnumeratorQueue.new(request_queue)
 | 
	
		
			
				|  |  | +          responses = get_responses(stub)
 | 
	
		
			
				|  |  | +          request_queue.push('first message')
 | 
	
		
			
				|  |  | +          # Now wait for the server to shut down.
 | 
	
		
			
				|  |  | +          server_thread.join
 | 
	
		
			
				|  |  | +          # Sanity check. This test is not interesting if
 | 
	
		
			
				|  |  | +          # Thread.abort_on_exception is not set.
 | 
	
		
			
				|  |  | +          expect(Thread.abort_on_exception).to be(true)
 | 
	
		
			
				|  |  | +          # An attempt to send a second message should fail now that the
 | 
	
		
			
				|  |  | +          # server is down.
 | 
	
		
			
				|  |  | +          request_queue.push('second message')
 | 
	
		
			
				|  |  | +          request_queue.push(nil)
 | 
	
		
			
				|  |  | +          expect { responses.next }.to raise_error(GRPC::BadStatus)
 | 
	
		
			
				|  |  | +        end
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        def run_server_bidi_shutdown_after_one_write
 | 
	
		
			
				|  |  | +          @server.start
 | 
	
		
			
				|  |  | +          recvd_rpc = @server.request_call
 | 
	
		
			
				|  |  | +          recvd_call = recvd_rpc.call
 | 
	
		
			
				|  |  | +          server_call = GRPC::ActiveCall.new(
 | 
	
		
			
				|  |  | +            recvd_call, noop, noop, INFINITE_FUTURE,
 | 
	
		
			
				|  |  | +            metadata_received: true, started: false)
 | 
	
		
			
				|  |  | +          server_call.send_initial_metadata
 | 
	
		
			
				|  |  | +          server_call.remote_send('message')
 | 
	
		
			
				|  |  | +          @server.shutdown_and_notify(from_relative_time(0))
 | 
	
		
			
				|  |  | +          @server.close
 | 
	
		
			
				|  |  | +        end
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        it 'receives a grpc status code when reading from a failed bidi call' do
 | 
	
		
			
				|  |  | +          server_thread = Thread.new do
 | 
	
		
			
				|  |  | +            run_server_bidi_shutdown_after_one_write
 | 
	
		
			
				|  |  | +          end
 | 
	
		
			
				|  |  | +          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
 | 
	
		
			
				|  |  | +          request_queue = Queue.new
 | 
	
		
			
				|  |  | +          @sent_msgs = EnumeratorQueue.new(request_queue)
 | 
	
		
			
				|  |  | +          responses = get_responses(stub)
 | 
	
		
			
				|  |  | +          expect(responses.next).to eq('message')
 | 
	
		
			
				|  |  | +          # Wait for the server to shut down
 | 
	
		
			
				|  |  | +          server_thread.join
 | 
	
		
			
				|  |  | +          expect { responses.next }.to raise_error(GRPC::BadStatus)
 | 
	
		
			
				|  |  | +          # Push a sentinel to allow the writer thread to finish
 | 
	
		
			
				|  |  | +          request_queue.push(nil)
 | 
	
		
			
				|  |  | +        end
 | 
	
		
			
				|  |  | +      end
 | 
	
		
			
				|  |  |      end
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      describe 'without a call operation' do
 | 
	
	
		
			
				|  | @@ -810,6 +894,55 @@ describe 'ClientStub' do  # rubocop:disable Metrics/BlockLength
 | 
	
		
			
				|  |  |            responses.each { |r| p r }
 | 
	
		
			
				|  |  |          end
 | 
	
		
			
				|  |  |        end
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
 | 
	
		
			
				|  |  | +        @server.start
 | 
	
		
			
				|  |  | +        recvd_rpc = @server.request_call
 | 
	
		
			
				|  |  | +        recvd_call = recvd_rpc.call
 | 
	
		
			
				|  |  | +        server_call = GRPC::ActiveCall.new(
 | 
	
		
			
				|  |  | +          recvd_call, noop, noop, INFINITE_FUTURE,
 | 
	
		
			
				|  |  | +          metadata_received: true, started: false)
 | 
	
		
			
				|  |  | +        server_call.send_initial_metadata
 | 
	
		
			
				|  |  | +        server_call.remote_send('server call received')
 | 
	
		
			
				|  |  | +        wait_for_shutdown_ok_callback.call
 | 
	
		
			
				|  |  | +        # since the client is cancelling the call,
 | 
	
		
			
				|  |  | +        # we should be able to shut down cleanly
 | 
	
		
			
				|  |  | +        @server.shutdown_and_notify(nil)
 | 
	
		
			
				|  |  | +        @server.close
 | 
	
		
			
				|  |  | +      end
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      it 'receives a grpc status code when reading from a cancelled bidi call' do
 | 
	
		
			
				|  |  | +        # This test tries to trigger a 'RECV_INITIAL_METADATA' and/or
 | 
	
		
			
				|  |  | +        # 'RECV_MESSAGE' op failure.
 | 
	
		
			
				|  |  | +        # An attempt to read a message might fail; in that case, iteration
 | 
	
		
			
				|  |  | +        # through the response stream should still result in a grpc status.
 | 
	
		
			
				|  |  | +        server_can_shutdown = false
 | 
	
		
			
				|  |  | +        server_can_shutdown_mu = Mutex.new
 | 
	
		
			
				|  |  | +        server_can_shutdown_cv = ConditionVariable.new
 | 
	
		
			
				|  |  | +        wait_for_shutdown_ok_callback = proc do
 | 
	
		
			
				|  |  | +          server_can_shutdown_mu.synchronize do
 | 
	
		
			
				|  |  | +            server_can_shutdown_cv.wait(server_can_shutdown_mu) until server_can_shutdown
 | 
	
		
			
				|  |  | +          end
 | 
	
		
			
				|  |  | +        end
 | 
	
		
			
				|  |  | +        server_thread = Thread.new do
 | 
	
		
			
				|  |  | +          run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
 | 
	
		
			
				|  |  | +        end
 | 
	
		
			
				|  |  | +        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
 | 
	
		
			
				|  |  | +        request_queue = Queue.new
 | 
	
		
			
				|  |  | +        @sent_msgs = EnumeratorQueue.new(request_queue)
 | 
	
		
			
				|  |  | +        responses = get_responses(stub)
 | 
	
		
			
				|  |  | +        expect(responses.next).to eq('server call received')
 | 
	
		
			
				|  |  | +        @op.cancel
 | 
	
		
			
				|  |  | +        expect { responses.next }.to raise_error(GRPC::Cancelled)
 | 
	
		
			
				|  |  | +        # Now let the server proceed to shut down.
 | 
	
		
			
				|  |  | +        server_can_shutdown_mu.synchronize do
 | 
	
		
			
				|  |  | +          server_can_shutdown = true
 | 
	
		
			
				|  |  | +          server_can_shutdown_cv.broadcast
 | 
	
		
			
				|  |  | +        end
 | 
	
		
			
				|  |  | +        server_thread.join
 | 
	
		
			
				|  |  | +        # Push a sentinel to allow the writer thread to finish
 | 
	
		
			
				|  |  | +        request_queue.push(nil)
 | 
	
		
			
				|  |  | +      end
 | 
	
		
			
				|  |  |      end
 | 
	
		
			
				|  |  |    end
 | 
	
		
			
				|  |  |  
 |