channel_connection_spec.rb 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. # Copyright 2015, Google Inc.
  2. # All rights reserved.
  3. #
  4. # Redistribution and use in source and binary forms, with or without
  5. # modification, are permitted provided that the following conditions are
  6. # met:
  7. #
  8. # * Redistributions of source code must retain the above copyright
  9. # notice, this list of conditions and the following disclaimer.
  10. # * Redistributions in binary form must reproduce the above
  11. # copyright notice, this list of conditions and the following disclaimer
  12. # in the documentation and/or other materials provided with the
  13. # distribution.
  14. # * Neither the name of Google Inc. nor the names of its
  15. # contributors may be used to endorse or promote products derived from
  16. # this software without specific prior written permission.
  17. #
  18. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  19. # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  20. # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  21. # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  22. # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  23. # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  24. # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  25. # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  26. # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  27. # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  28. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  29. require 'grpc'
  30. require 'timeout'
  31. include Timeout
  32. include GRPC::Core
  33. # A test message
  34. class EchoMsg
  35. def self.marshal(_o)
  36. ''
  37. end
  38. def self.unmarshal(_o)
  39. EchoMsg.new
  40. end
  41. end
  42. # A test service with an echo implementation.
  43. class EchoService
  44. include GRPC::GenericService
  45. rpc :an_rpc, EchoMsg, EchoMsg
  46. attr_reader :received_md
  47. def initialize(**kw)
  48. @trailing_metadata = kw
  49. @received_md = []
  50. end
  51. def an_rpc(req, call)
  52. GRPC.logger.info('echo service received a request')
  53. call.output_metadata.update(@trailing_metadata)
  54. @received_md << call.metadata unless call.metadata.nil?
  55. req
  56. end
  57. end
  58. EchoStub = EchoService.rpc_stub_class
  59. def start_server(port = 0)
  60. @srv = GRPC::RpcServer.new(pool_size: 1)
  61. server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure)
  62. @srv.handle(EchoService)
  63. @server_thd = Thread.new { @srv.run }
  64. @srv.wait_till_running
  65. server_port
  66. end
  67. def stop_server
  68. expect(@srv.stopped?).to be(false)
  69. @srv.stop
  70. @server_thd.join
  71. expect(@srv.stopped?).to be(true)
  72. end
  73. describe 'channel connection behavior' do
  74. it 'the client channel handles temporary loss of a transport' do
  75. port = start_server
  76. stub = EchoStub.new("localhost:#{port}", :this_channel_is_insecure)
  77. req = EchoMsg.new
  78. expect(stub.an_rpc(req)).to be_a(EchoMsg)
  79. stop_server
  80. sleep 1
  81. # TODO(apolcyn) grabbing the same port might fail, is this stable enough?
  82. start_server(port)
  83. expect(stub.an_rpc(req)).to be_a(EchoMsg)
  84. stop_server
  85. end
  86. it 'observably connects and reconnects to transient server' \
  87. ' when using the channel state API' do
  88. port = start_server
  89. ch = GRPC::Core::Channel.new("localhost:#{port}", {},
  90. :this_channel_is_insecure)
  91. expect(ch.connectivity_state).to be(GRPC::Core::ConnectivityStates::IDLE)
  92. state = ch.connectivity_state(true)
  93. count = 0
  94. while count < 20 && state != GRPC::Core::ConnectivityStates::READY
  95. ch.watch_connectivity_state(state, Time.now + 60)
  96. state = ch.connectivity_state(true)
  97. count += 1
  98. end
  99. expect(state).to be(GRPC::Core::ConnectivityStates::READY)
  100. stop_server
  101. state = ch.connectivity_state
  102. count = 0
  103. while count < 20 && state == GRPC::Core::ConnectivityStates::READY
  104. ch.watch_connectivity_state(state, Time.now + 60)
  105. state = ch.connectivity_state
  106. count += 1
  107. end
  108. expect(state).to_not be(GRPC::Core::ConnectivityStates::READY)
  109. start_server(port)
  110. state = ch.connectivity_state(true)
  111. count = 0
  112. while count < 20 && state != GRPC::Core::ConnectivityStates::READY
  113. ch.watch_connectivity_state(state, Time.now + 60)
  114. state = ch.connectivity_state(true)
  115. count += 1
  116. end
  117. expect(state).to be(GRPC::Core::ConnectivityStates::READY)
  118. stop_server
  119. end
  120. it 'concurrent watches on the same channel' do
  121. timeout(180) do
  122. port = start_server
  123. ch = GRPC::Core::Channel.new("localhost:#{port}", {},
  124. :this_channel_is_insecure)
  125. stop_server
  126. thds = []
  127. 50.times do
  128. thds << Thread.new do
  129. while ch.connectivity_state(true) != ConnectivityStates::READY
  130. ch.watch_connectivity_state(
  131. ConnectivityStates::READY, Time.now + 60)
  132. break
  133. end
  134. end
  135. end
  136. sleep 0.01
  137. start_server(port)
  138. thds.each(&:join)
  139. stop_server
  140. end
  141. end
  142. end