rpc_server_pool_spec.rb 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. # Copyright 2015 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. require 'spec_helper'
  15. Thread.abort_on_exception = true
  16. describe GRPC::Pool do
  17. Pool = GRPC::Pool
  18. describe '#new' do
  19. it 'raises if a non-positive size is used' do
  20. expect { Pool.new(0) }.to raise_error
  21. expect { Pool.new(-1) }.to raise_error
  22. expect { Pool.new(Object.new) }.to raise_error
  23. end
  24. it 'is constructed OK with a positive size' do
  25. expect { Pool.new(1) }.not_to raise_error
  26. end
  27. end
  28. describe '#ready_for_work?' do
  29. it 'before start it is not ready' do
  30. p = Pool.new(1)
  31. expect(p.ready_for_work?).to be(false)
  32. end
  33. it 'it stops being ready after all workers are busy' do
  34. p = Pool.new(5)
  35. p.start
  36. wait_mu = Mutex.new
  37. wait_cv = ConditionVariable.new
  38. wait = true
  39. job = proc do
  40. wait_mu.synchronize do
  41. wait_cv.wait(wait_mu) while wait
  42. end
  43. end
  44. 5.times do
  45. expect(p.ready_for_work?).to be(true)
  46. p.schedule(&job)
  47. end
  48. expect(p.ready_for_work?).to be(false)
  49. wait_mu.synchronize do
  50. wait = false
  51. wait_cv.broadcast
  52. end
  53. end
  54. end
  55. describe '#schedule' do
  56. it 'return if the pool is already stopped' do
  57. p = Pool.new(1)
  58. p.stop
  59. job = proc {}
  60. expect { p.schedule(&job) }.to_not raise_error
  61. end
  62. it 'adds jobs that get run by the pool' do
  63. p = Pool.new(1)
  64. p.start
  65. o, q = Object.new, Queue.new
  66. job = proc { q.push(o) }
  67. p.schedule(&job)
  68. expect(q.pop).to be(o)
  69. p.stop
  70. end
  71. end
  72. describe '#stop' do
  73. it 'works when there are no scheduled tasks' do
  74. p = Pool.new(1)
  75. expect { p.stop }.not_to raise_error
  76. end
  77. it 'stops jobs when there are long running jobs' do
  78. p = Pool.new(1)
  79. p.start
  80. wait_forever_mu = Mutex.new
  81. wait_forever_cv = ConditionVariable.new
  82. wait_forever = true
  83. job_running = Queue.new
  84. job = proc do
  85. job_running.push(Object.new)
  86. wait_forever_mu.synchronize do
  87. wait_forever_cv.wait while wait_forever
  88. end
  89. end
  90. p.schedule(&job)
  91. job_running.pop
  92. expect { p.stop }.not_to raise_error
  93. end
  94. end
  95. describe '#start' do
  96. it 'runs jobs as they are scheduled' do
  97. p = Pool.new(5)
  98. o, q = Object.new, Queue.new
  99. p.start
  100. n = 5 # arbitrary
  101. n.times do
  102. p.schedule(o, &q.method(:push))
  103. expect(q.pop).to be(o)
  104. end
  105. p.stop
  106. end
  107. end
  108. end