Эх сурвалжийг харах

Fixes a multithreading bug in the ruby rpc_server

This fixes the current ruby server interop test flakiness.

- The interop tests recently changed so that clients access servers in
parallel, meaning that the interop servers handle multiple rpcs at once,
which is how this bug became visible

- since this change, tests run against the ruby interop server have
failed sporadically

- the problem was that the block in #loop_handle_server_calls that is
  passed to the thread pool referenced a var in an enclosing block,
  which resulted in requests being processed by the wrong handler

- this fix ensures that the block to be run on the thread pool thread
  does not have any references to vars in the enclosing block
Tim Emiola 10 жил өмнө
parent
commit
7d21c04b2c

+ 11 - 8
src/ruby/lib/grpc/generic/rpc_server.rb

@@ -418,11 +418,11 @@ module GRPC
           an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE)
           an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE)
           break if (!an_rpc.nil?) && an_rpc.call.nil?
           break if (!an_rpc.nil?) && an_rpc.call.nil?
 
 
-          c = new_active_server_call(an_rpc)
-          unless c.nil?
-            mth = an_rpc.method.to_sym
-            @pool.schedule(c) do |call|
-              rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
+          active_call = new_active_server_call(an_rpc)
+          unless active_call.nil?
+            @pool.schedule(active_call) do |ac|
+              c, mth = ac
+              rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
             end
             end
           end
           end
         rescue Core::CallError, RuntimeError => e
         rescue Core::CallError, RuntimeError => e
@@ -442,6 +442,7 @@ module GRPC
       # allow the metadata to be accessed from the call
       # allow the metadata to be accessed from the call
       handle_call_tag = Object.new
       handle_call_tag = Object.new
       an_rpc.call.metadata = an_rpc.metadata  # attaches md to call for handlers
       an_rpc.call.metadata = an_rpc.metadata  # attaches md to call for handlers
+      GRPC.logger.debug("call md is #{an_rpc.metadata}")
       connect_md = nil
       connect_md = nil
       unless @connect_md_proc.nil?
       unless @connect_md_proc.nil?
         connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
         connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
@@ -454,9 +455,11 @@ module GRPC
       # Create the ActiveCall
       # Create the ActiveCall
       GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
       GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
       rpc_desc = rpc_descs[an_rpc.method.to_sym]
       rpc_desc = rpc_descs[an_rpc.method.to_sym]
-      ActiveCall.new(an_rpc.call, @cq,
-                     rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
-                     an_rpc.deadline)
+      c = ActiveCall.new(an_rpc.call, @cq,
+                         rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
+                         an_rpc.deadline)
+      mth = an_rpc.method.to_sym
+      [c, mth]
     end
     end
 
 
     protected
     protected