|  | @@ -31,113 +31,10 @@ require_relative '../grpc'
 | 
	
		
			
				|  |  |  require_relative 'active_call'
 | 
	
		
			
				|  |  |  require_relative 'service'
 | 
	
		
			
				|  |  |  require 'thread'
 | 
	
		
			
				|  |  | +require 'concurrent'
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  # GRPC contains the General RPC module.
 | 
	
		
			
				|  |  |  module GRPC
 | 
	
		
			
				|  |  | -  # Pool is a simple thread pool.
 | 
	
		
			
				|  |  | -  class Pool
 | 
	
		
			
				|  |  | -    # Default keep alive period is 1s
 | 
	
		
			
				|  |  | -    DEFAULT_KEEP_ALIVE = 1
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
 | 
	
		
			
				|  |  | -      fail 'pool size must be positive' unless size > 0
 | 
	
		
			
				|  |  | -      @jobs = Queue.new
 | 
	
		
			
				|  |  | -      @size = size
 | 
	
		
			
				|  |  | -      @stopped = false
 | 
	
		
			
				|  |  | -      @stop_mutex = Mutex.new # needs to be held when accessing @stopped
 | 
	
		
			
				|  |  | -      @stop_cond = ConditionVariable.new
 | 
	
		
			
				|  |  | -      @workers = []
 | 
	
		
			
				|  |  | -      @keep_alive = keep_alive
 | 
	
		
			
				|  |  | -    end
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    # Returns the number of jobs waiting
 | 
	
		
			
				|  |  | -    def jobs_waiting
 | 
	
		
			
				|  |  | -      @jobs.size
 | 
	
		
			
				|  |  | -    end
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    # Runs the given block on the queue with the provided args.
 | 
	
		
			
				|  |  | -    #
 | 
	
		
			
				|  |  | -    # @param args the args passed blk when it is called
 | 
	
		
			
				|  |  | -    # @param blk the block to call
 | 
	
		
			
				|  |  | -    def schedule(*args, &blk)
 | 
	
		
			
				|  |  | -      return if blk.nil?
 | 
	
		
			
				|  |  | -      @stop_mutex.synchronize do
 | 
	
		
			
				|  |  | -        if @stopped
 | 
	
		
			
				|  |  | -          GRPC.logger.warn('did not schedule job, already stopped')
 | 
	
		
			
				|  |  | -          return
 | 
	
		
			
				|  |  | -        end
 | 
	
		
			
				|  |  | -        GRPC.logger.info('schedule another job')
 | 
	
		
			
				|  |  | -        @jobs << [blk, args]
 | 
	
		
			
				|  |  | -      end
 | 
	
		
			
				|  |  | -    end
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    # Starts running the jobs in the thread pool.
 | 
	
		
			
				|  |  | -    def start
 | 
	
		
			
				|  |  | -      @stop_mutex.synchronize do
 | 
	
		
			
				|  |  | -        fail 'already stopped' if @stopped
 | 
	
		
			
				|  |  | -      end
 | 
	
		
			
				|  |  | -      until @workers.size == @size.to_i
 | 
	
		
			
				|  |  | -        next_thread = Thread.new do
 | 
	
		
			
				|  |  | -          catch(:exit) do  # allows { throw :exit } to kill a thread
 | 
	
		
			
				|  |  | -            loop_execute_jobs
 | 
	
		
			
				|  |  | -          end
 | 
	
		
			
				|  |  | -          remove_current_thread
 | 
	
		
			
				|  |  | -        end
 | 
	
		
			
				|  |  | -        @workers << next_thread
 | 
	
		
			
				|  |  | -      end
 | 
	
		
			
				|  |  | -    end
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    # Stops the jobs in the pool
 | 
	
		
			
				|  |  | -    def stop
 | 
	
		
			
				|  |  | -      GRPC.logger.info('stopping, will wait for all the workers to exit')
 | 
	
		
			
				|  |  | -      @workers.size.times { schedule { throw :exit } }
 | 
	
		
			
				|  |  | -      @stop_mutex.synchronize do  # wait @keep_alive for works to stop
 | 
	
		
			
				|  |  | -        @stopped = true
 | 
	
		
			
				|  |  | -        @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
 | 
	
		
			
				|  |  | -      end
 | 
	
		
			
				|  |  | -      forcibly_stop_workers
 | 
	
		
			
				|  |  | -      GRPC.logger.info('stopped, all workers are shutdown')
 | 
	
		
			
				|  |  | -    end
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    protected
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    # Forcibly shutdown any threads that are still alive.
 | 
	
		
			
				|  |  | -    def forcibly_stop_workers
 | 
	
		
			
				|  |  | -      return unless @workers.size > 0
 | 
	
		
			
				|  |  | -      GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
 | 
	
		
			
				|  |  | -      @workers.each do |t|
 | 
	
		
			
				|  |  | -        next unless t.alive?
 | 
	
		
			
				|  |  | -        begin
 | 
	
		
			
				|  |  | -          t.exit
 | 
	
		
			
				|  |  | -        rescue StandardError => e
 | 
	
		
			
				|  |  | -          GRPC.logger.warn('error while terminating a worker')
 | 
	
		
			
				|  |  | -          GRPC.logger.warn(e)
 | 
	
		
			
				|  |  | -        end
 | 
	
		
			
				|  |  | -      end
 | 
	
		
			
				|  |  | -    end
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    # removes the threads from workers, and signal when all the
 | 
	
		
			
				|  |  | -    # threads are complete.
 | 
	
		
			
				|  |  | -    def remove_current_thread
 | 
	
		
			
				|  |  | -      @stop_mutex.synchronize do
 | 
	
		
			
				|  |  | -        @workers.delete(Thread.current)
 | 
	
		
			
				|  |  | -        @stop_cond.signal if @workers.size.zero?
 | 
	
		
			
				|  |  | -      end
 | 
	
		
			
				|  |  | -    end
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    def loop_execute_jobs
 | 
	
		
			
				|  |  | -      loop do
 | 
	
		
			
				|  |  | -        begin
 | 
	
		
			
				|  |  | -          blk, args = @jobs.pop
 | 
	
		
			
				|  |  | -          blk.call(*args)
 | 
	
		
			
				|  |  | -        rescue StandardError => e
 | 
	
		
			
				|  |  | -          GRPC.logger.warn('Error in worker thread')
 | 
	
		
			
				|  |  | -          GRPC.logger.warn(e)
 | 
	
		
			
				|  |  | -        end
 | 
	
		
			
				|  |  | -      end
 | 
	
		
			
				|  |  | -    end
 | 
	
		
			
				|  |  | -  end
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    # RpcServer hosts a number of services and makes them available on the
 | 
	
		
			
				|  |  |    # network.
 | 
	
		
			
				|  |  |    class RpcServer
 | 
	
	
		
			
				|  | @@ -147,11 +44,14 @@ module GRPC
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def_delegators :@server, :add_http2_port
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    # Default thread pool size is 3
 | 
	
		
			
				|  |  | -    DEFAULT_POOL_SIZE = 3
 | 
	
		
			
				|  |  | +    # Default max size of the thread pool size is 100
 | 
	
		
			
				|  |  | +    DEFAULT_MAX_POOL_SIZE = 100
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # Default minimum size of the thread pool is 5
 | 
	
		
			
				|  |  | +    DEFAULT_MIN_POOL_SIZE = 5
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    # Default max_waiting_requests size is 20
 | 
	
		
			
				|  |  | -    DEFAULT_MAX_WAITING_REQUESTS = 20
 | 
	
		
			
				|  |  | +    # Default max_waiting_requests size is 60
 | 
	
		
			
				|  |  | +    DEFAULT_MAX_WAITING_REQUESTS = 60
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      # Default poll period is 1s
 | 
	
		
			
				|  |  |      DEFAULT_POLL_PERIOD = 1
 | 
	
	
		
			
				|  | @@ -174,8 +74,8 @@ module GRPC
 | 
	
		
			
				|  |  |      # There are some specific keyword args used to configure the RpcServer
 | 
	
		
			
				|  |  |      # instance.
 | 
	
		
			
				|  |  |      #
 | 
	
		
			
				|  |  | -    # * pool_size: the size of the thread pool the server uses to run its
 | 
	
		
			
				|  |  | -    # threads
 | 
	
		
			
				|  |  | +    # * pool_size: the maximum size of the thread pool that the server's
 | 
	
		
			
				|  |  | +    # thread pool can reach.
 | 
	
		
			
				|  |  |      #
 | 
	
		
			
				|  |  |      # * max_waiting_requests: the maximum number of requests that are not
 | 
	
		
			
				|  |  |      # being handled to allow. When this limit is exceeded, the server responds
 | 
	
	
		
			
				|  | @@ -191,7 +91,8 @@ module GRPC
 | 
	
		
			
				|  |  |      #
 | 
	
		
			
				|  |  |      # * server_args:
 | 
	
		
			
				|  |  |      # A server arguments hash to be passed down to the underlying core server
 | 
	
		
			
				|  |  | -    def initialize(pool_size:DEFAULT_POOL_SIZE,
 | 
	
		
			
				|  |  | +    def initialize(pool_size:DEFAULT_MAX_POOL_SIZE,
 | 
	
		
			
				|  |  | +                   min_pool_size:DEFAULT_MIN_POOL_SIZE,
 | 
	
		
			
				|  |  |                     max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
 | 
	
		
			
				|  |  |                     poll_period:DEFAULT_POLL_PERIOD,
 | 
	
		
			
				|  |  |                     connect_md_proc:nil,
 | 
	
	
		
			
				|  | @@ -199,8 +100,12 @@ module GRPC
 | 
	
		
			
				|  |  |        @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
 | 
	
		
			
				|  |  |        @max_waiting_requests = max_waiting_requests
 | 
	
		
			
				|  |  |        @poll_period = poll_period
 | 
	
		
			
				|  |  | -      @pool_size = pool_size
 | 
	
		
			
				|  |  | -      @pool = Pool.new(@pool_size)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      @pool = Concurrent::ThreadPoolExecutor.new(
 | 
	
		
			
				|  |  | +        min_threads: [min_pool_size, pool_size].min,
 | 
	
		
			
				|  |  | +        max_threads: pool_size,
 | 
	
		
			
				|  |  | +        max_queue: max_waiting_requests,
 | 
	
		
			
				|  |  | +        fallback_policy: :discard)
 | 
	
		
			
				|  |  |        @run_cond = ConditionVariable.new
 | 
	
		
			
				|  |  |        @run_mutex = Mutex.new
 | 
	
		
			
				|  |  |        # running_state can take 4 values: :not_started, :running, :stopping, and
 | 
	
	
		
			
				|  | @@ -221,7 +126,8 @@ module GRPC
 | 
	
		
			
				|  |  |        end
 | 
	
		
			
				|  |  |        deadline = from_relative_time(@poll_period)
 | 
	
		
			
				|  |  |        @server.close(deadline)
 | 
	
		
			
				|  |  | -      @pool.stop
 | 
	
		
			
				|  |  | +      @pool.shutdown
 | 
	
		
			
				|  |  | +      @pool.wait_for_termination
 | 
	
		
			
				|  |  |      end
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def running_state
 | 
	
	
		
			
				|  | @@ -318,7 +224,6 @@ module GRPC
 | 
	
		
			
				|  |  |      def run
 | 
	
		
			
				|  |  |        @run_mutex.synchronize do
 | 
	
		
			
				|  |  |          fail 'cannot run without registering services' if rpc_descs.size.zero?
 | 
	
		
			
				|  |  | -        @pool.start
 | 
	
		
			
				|  |  |          @server.start
 | 
	
		
			
				|  |  |          transition_running_state(:running)
 | 
	
		
			
				|  |  |          @run_cond.broadcast
 | 
	
	
		
			
				|  | @@ -330,9 +235,11 @@ module GRPC
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      # Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
 | 
	
		
			
				|  |  |      def available?(an_rpc)
 | 
	
		
			
				|  |  | -      jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
 | 
	
		
			
				|  |  | +      jobs_count, max = @pool.queue_length, @pool.max_queue
 | 
	
		
			
				|  |  |        GRPC.logger.info("waiting: #{jobs_count}, max: #{max}")
 | 
	
		
			
				|  |  | -      return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      # remaining capacity for ThreadPoolExecutors is -1 if unbounded
 | 
	
		
			
				|  |  | +      return an_rpc if @pool.remaining_capacity != 0
 | 
	
		
			
				|  |  |        GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
 | 
	
		
			
				|  |  |        noop = proc { |x| x }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -368,7 +275,7 @@ module GRPC
 | 
	
		
			
				|  |  |            break if (!an_rpc.nil?) && an_rpc.call.nil?
 | 
	
		
			
				|  |  |            active_call = new_active_server_call(an_rpc)
 | 
	
		
			
				|  |  |            unless active_call.nil?
 | 
	
		
			
				|  |  | -            @pool.schedule(active_call) do |ac|
 | 
	
		
			
				|  |  | +            @pool.post(active_call) do |ac|
 | 
	
		
			
				|  |  |                c, mth = ac
 | 
	
		
			
				|  |  |                begin
 | 
	
		
			
				|  |  |                  rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
 |