| 
					
				 | 
			
			
				@@ -100,6 +100,7 @@ module GRPC 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       replys = gen_each_reply.call(each_queued_msg) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       @enq_th = start_write_loop(replys, is_client: false) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       @loop_th = start_read_loop 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      @enq_th.join if @enq_th.alive? 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -115,7 +116,7 @@ module GRPC 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       return enum_for(:each_queued_msg) unless block_given? 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       count = 0 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       loop do 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        GRPC.logger.debug("each_queued_msg: msg##{count}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        GRPC.logger.debug("each_queued_msg: waiting##{count}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         count += 1 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         req = @readq.pop 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         GRPC.logger.debug("each_queued_msg: req = #{req}") 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -123,70 +124,73 @@ module GRPC 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         break if req.equal?(END_OF_READS) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         yield req 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-      @enq_th.join if @enq_th.alive? 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     # during bidi-streaming, read the requests to send from a separate thread 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     # read so that read_loop does not block waiting for requests to read. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     def start_write_loop(requests, is_client: true) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       Thread.new do  # TODO: run on a thread pool 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        write_tag = Object.new 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        GRPC.logger.debug('bidi-write-loop: starting') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         begin 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          write_tag = Object.new 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				           count = 0 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				           requests.each do |req| 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            GRPC.logger.debug("bidi-write_loop: #{count}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            GRPC.logger.debug("bidi-write-loop: #{count}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             count += 1 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             payload = @marshal.call(req) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             @call.run_batch(@cq, write_tag, INFINITE_FUTURE, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                             SEND_MESSAGE => payload) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				           end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          GRPC.logger.debug("bidi-write-loop: #{count} writes done") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				           if is_client 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            GRPC.logger.debug("bidi-write-loop: sent #{count}, waiting") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            @call.run_batch(@cq, write_tag, INFINITE_FUTURE, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            SEND_CLOSE_FROM_CLIENT => nil) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                           SEND_CLOSE_FROM_CLIENT => nil, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                            RECV_STATUS_ON_CLIENT => nil) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             batch_result.check_status 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				           end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         rescue StandardError => e 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-          GRPC.logger.warn('bidi-write_loop: failed') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          GRPC.logger.warn('bidi-write-loop: failed') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				           GRPC.logger.warn(e) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				           raise e 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        GRPC.logger.debug('bidi-write-loop: finished') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     # starts the read loop 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     def start_read_loop 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       Thread.new do 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        GRPC.logger.debug('bidi-read-loop: starting') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         begin 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				           read_tag = Object.new 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				           count = 0 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				           # queue the initial read before beginning the loop 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				           loop do 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            GRPC.logger.debug("bidi-read_loop: #{count}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            GRPC.logger.debug("bidi-read-loop: #{count}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             count += 1 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             # TODO: ensure metadata is read if available, currently it's not 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                            RECV_MESSAGE => nil) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             # handle the next message 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             if batch_result.message.nil? 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+              GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				               @readq.push(END_OF_READS) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				               GRPC.logger.debug('bidi-read-loop: done reading!') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				               break 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             # push the latest read onto the queue and continue reading 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            GRPC.logger.debug("received req: #{batch_result.message}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             res = @unmarshal.call(batch_result.message) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             @readq.push(res) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				           end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         rescue StandardError => e 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-          GRPC.logger.warn('bidi: read_loop failed') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          GRPC.logger.warn('bidi: read-loop failed') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				           GRPC.logger.warn(e) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				           @readq.push(e)  # let each_queued_msg terminate with this error 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        GRPC.logger.debug('bidi-read-loop: finished') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   end 
			 |