channel_connection_spec.rb 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. # Copyright 2015 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. require 'spec_helper'
  15. require 'timeout'
  16. include Timeout
  17. include GRPC::Core
  18. include GRPC::Spec::Helpers
  19. def start_server(port = 0)
  20. @srv = new_rpc_server_for_testing(pool_size: 1)
  21. server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure)
  22. @srv.handle(EchoService)
  23. @server_thd = Thread.new { @srv.run }
  24. @srv.wait_till_running
  25. server_port
  26. end
  27. def stop_server
  28. expect(@srv.stopped?).to be(false)
  29. @srv.stop
  30. @server_thd.join
  31. expect(@srv.stopped?).to be(true)
  32. end
  33. describe 'channel connection behavior' do
  34. it 'the client channel handles temporary loss of a transport' do
  35. port = start_server
  36. stub = EchoStub.new("localhost:#{port}", :this_channel_is_insecure)
  37. req = EchoMsg.new
  38. expect(stub.an_rpc(req)).to be_a(EchoMsg)
  39. stop_server
  40. sleep 1
  41. # TODO(apolcyn) grabbing the same port might fail, is this stable enough?
  42. start_server(port)
  43. expect(stub.an_rpc(req)).to be_a(EchoMsg)
  44. stop_server
  45. end
  46. it 'observably connects and reconnects to transient server' \
  47. ' when using the channel state API' do
  48. port = start_server
  49. ch = GRPC::Core::Channel.new("localhost:#{port}", {},
  50. :this_channel_is_insecure)
  51. expect(ch.connectivity_state).to be(GRPC::Core::ConnectivityStates::IDLE)
  52. state = ch.connectivity_state(true)
  53. count = 0
  54. while count < 20 && state != GRPC::Core::ConnectivityStates::READY
  55. ch.watch_connectivity_state(state, Time.now + 60)
  56. state = ch.connectivity_state(true)
  57. count += 1
  58. end
  59. expect(state).to be(GRPC::Core::ConnectivityStates::READY)
  60. stop_server
  61. state = ch.connectivity_state
  62. count = 0
  63. while count < 20 && state == GRPC::Core::ConnectivityStates::READY
  64. ch.watch_connectivity_state(state, Time.now + 60)
  65. state = ch.connectivity_state
  66. count += 1
  67. end
  68. expect(state).to_not be(GRPC::Core::ConnectivityStates::READY)
  69. start_server(port)
  70. state = ch.connectivity_state(true)
  71. count = 0
  72. while count < 20 && state != GRPC::Core::ConnectivityStates::READY
  73. ch.watch_connectivity_state(state, Time.now + 60)
  74. state = ch.connectivity_state(true)
  75. count += 1
  76. end
  77. expect(state).to be(GRPC::Core::ConnectivityStates::READY)
  78. stop_server
  79. end
  80. it 'concurrent watches on the same channel' do
  81. timeout(180) do
  82. port = start_server
  83. ch = GRPC::Core::Channel.new("localhost:#{port}", {},
  84. :this_channel_is_insecure)
  85. stop_server
  86. thds = []
  87. 50.times do
  88. thds << Thread.new do
  89. while ch.connectivity_state(true) != ConnectivityStates::READY
  90. ch.watch_connectivity_state(
  91. ConnectivityStates::READY, Time.now + 60)
  92. break
  93. end
  94. end
  95. end
  96. sleep 0.01
  97. start_server(port)
  98. thds.each(&:join)
  99. stop_server
  100. end
  101. end
  102. end