client_stub_spec.rb 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. # Copyright 2015, Google Inc.
  2. # All rights reserved.
  3. #
  4. # Redistribution and use in source and binary forms, with or without
  5. # modification, are permitted provided that the following conditions are
  6. # met:
  7. #
  8. # * Redistributions of source code must retain the above copyright
  9. # notice, this list of conditions and the following disclaimer.
  10. # * Redistributions in binary form must reproduce the above
  11. # copyright notice, this list of conditions and the following disclaimer
  12. # in the documentation and/or other materials provided with the
  13. # distribution.
  14. # * Neither the name of Google Inc. nor the names of its
  15. # contributors may be used to endorse or promote products derived from
  16. # this software without specific prior written permission.
  17. #
  18. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  19. # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  20. # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  21. # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  22. # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  23. # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  24. # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  25. # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  26. # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  27. # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  28. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  29. require 'grpc'
  30. Thread.abort_on_exception = true
  31. def wakey_thread(&blk)
  32. n = GRPC::Notifier.new
  33. t = Thread.new do
  34. blk.call(n)
  35. end
  36. t.abort_on_exception = true
  37. n.wait
  38. t
  39. end
  40. def load_test_certs
  41. test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
  42. files = ['ca.pem', 'server1.key', 'server1.pem']
  43. files.map { |f| File.open(File.join(test_root, f)).read }
  44. end
  45. include GRPC::Core::StatusCodes
  46. include GRPC::Core::TimeConsts
  47. include GRPC::Core::CallOps
  48. describe 'ClientStub' do
  49. let(:noop) { proc { |x| x } }
  50. before(:each) do
  51. Thread.abort_on_exception = true
  52. @server = nil
  53. @method = 'an_rpc_method'
  54. @pass = OK
  55. @fail = INTERNAL
  56. end
  57. after(:each) do
  58. @server.close(from_relative_time(2)) unless @server.nil?
  59. end
  60. describe '#new' do
  61. let(:fake_host) { 'localhost:0' }
  62. it 'can be created from a host and args' do
  63. opts = { channel_args: { a_channel_arg: 'an_arg' } }
  64. blk = proc do
  65. GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
  66. end
  67. expect(&blk).not_to raise_error
  68. end
  69. it 'can be created with an channel override' do
  70. opts = {
  71. channel_args: { a_channel_arg: 'an_arg' },
  72. channel_override: @ch
  73. }
  74. blk = proc do
  75. GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
  76. end
  77. expect(&blk).not_to raise_error
  78. end
  79. it 'cannot be created with a bad channel override' do
  80. blk = proc do
  81. opts = {
  82. channel_args: { a_channel_arg: 'an_arg' },
  83. channel_override: Object.new
  84. }
  85. GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
  86. end
  87. expect(&blk).to raise_error
  88. end
  89. it 'cannot be created with bad credentials' do
  90. blk = proc do
  91. opts = { channel_args: { a_channel_arg: 'an_arg' } }
  92. GRPC::ClientStub.new(fake_host, Object.new, **opts)
  93. end
  94. expect(&blk).to raise_error
  95. end
  96. it 'can be created with test test credentials' do
  97. certs = load_test_certs
  98. blk = proc do
  99. opts = {
  100. channel_args: {
  101. GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr',
  102. a_channel_arg: 'an_arg'
  103. }
  104. }
  105. creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil)
  106. GRPC::ClientStub.new(fake_host, creds, **opts)
  107. end
  108. expect(&blk).to_not raise_error
  109. end
  110. end
  111. describe '#request_response' do
  112. before(:each) do
  113. @sent_msg, @resp = 'a_msg', 'a_reply'
  114. end
  115. shared_examples 'request response' do
  116. it 'should send a request to/receive a reply from a server' do
  117. server_port = create_test_server
  118. th = run_request_response(@sent_msg, @resp, @pass)
  119. stub = GRPC::ClientStub.new("localhost:#{server_port}",
  120. :this_channel_is_insecure)
  121. expect(get_response(stub)).to eq(@resp)
  122. th.join
  123. end
  124. it 'should send metadata to the server ok' do
  125. server_port = create_test_server
  126. host = "localhost:#{server_port}"
  127. th = run_request_response(@sent_msg, @resp, @pass,
  128. k1: 'v1', k2: 'v2')
  129. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  130. expect(get_response(stub)).to eq(@resp)
  131. th.join
  132. end
  133. it 'should send a request when configured using an override channel' do
  134. server_port = create_test_server
  135. alt_host = "localhost:#{server_port}"
  136. th = run_request_response(@sent_msg, @resp, @pass)
  137. ch = GRPC::Core::Channel.new(alt_host, nil, :this_channel_is_insecure)
  138. stub = GRPC::ClientStub.new('ignored-host',
  139. :this_channel_is_insecure,
  140. channel_override: ch)
  141. expect(get_response(stub)).to eq(@resp)
  142. th.join
  143. end
  144. it 'should raise an error if the status is not OK' do
  145. server_port = create_test_server
  146. host = "localhost:#{server_port}"
  147. th = run_request_response(@sent_msg, @resp, @fail)
  148. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  149. blk = proc { get_response(stub) }
  150. expect(&blk).to raise_error(GRPC::BadStatus)
  151. th.join
  152. end
  153. end
  154. describe 'without a call operation' do
  155. def get_response(stub)
  156. stub.request_response(@method, @sent_msg, noop, noop,
  157. metadata: { k1: 'v1', k2: 'v2' })
  158. end
  159. it_behaves_like 'request response'
  160. end
  161. describe 'via a call operation' do
  162. def get_response(stub)
  163. op = stub.request_response(@method, @sent_msg, noop, noop,
  164. return_op: true,
  165. metadata: { k1: 'v1', k2: 'v2' },
  166. deadline: from_relative_time(2))
  167. expect(op).to be_a(GRPC::ActiveCall::Operation)
  168. op.execute
  169. end
  170. it_behaves_like 'request response'
  171. end
  172. end
  173. describe '#client_streamer' do
  174. shared_examples 'client streaming' do
  175. before(:each) do
  176. server_port = create_test_server
  177. host = "localhost:#{server_port}"
  178. @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  179. @metadata = { k1: 'v1', k2: 'v2' }
  180. @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
  181. @resp = 'a_reply'
  182. end
  183. it 'should send requests to/receive a reply from a server' do
  184. th = run_client_streamer(@sent_msgs, @resp, @pass)
  185. expect(get_response(@stub)).to eq(@resp)
  186. th.join
  187. end
  188. it 'should send metadata to the server ok' do
  189. th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
  190. expect(get_response(@stub)).to eq(@resp)
  191. th.join
  192. end
  193. it 'should raise an error if the status is not ok' do
  194. th = run_client_streamer(@sent_msgs, @resp, @fail)
  195. blk = proc { get_response(@stub) }
  196. expect(&blk).to raise_error(GRPC::BadStatus)
  197. th.join
  198. end
  199. it 'should raise ArgumentError if metadata contains invalid values' do
  200. @metadata.merge!(k3: 3)
  201. expect do
  202. get_response(@stub)
  203. end.to raise_error(ArgumentError,
  204. /Header values must be of type string or array/)
  205. end
  206. end
  207. describe 'without a call operation' do
  208. def get_response(stub)
  209. stub.client_streamer(@method, @sent_msgs, noop, noop,
  210. metadata: @metadata)
  211. end
  212. it_behaves_like 'client streaming'
  213. end
  214. describe 'via a call operation' do
  215. def get_response(stub)
  216. op = stub.client_streamer(@method, @sent_msgs, noop, noop,
  217. return_op: true, metadata: @metadata)
  218. expect(op).to be_a(GRPC::ActiveCall::Operation)
  219. op.execute
  220. end
  221. it_behaves_like 'client streaming'
  222. end
  223. end
  224. describe '#server_streamer' do
  225. shared_examples 'server streaming' do
  226. before(:each) do
  227. @sent_msg = 'a_msg'
  228. @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
  229. end
  230. it 'should send a request to/receive replies from a server' do
  231. server_port = create_test_server
  232. host = "localhost:#{server_port}"
  233. th = run_server_streamer(@sent_msg, @replys, @pass)
  234. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  235. expect(get_responses(stub).collect { |r| r }).to eq(@replys)
  236. th.join
  237. end
  238. it 'should raise an error if the status is not ok' do
  239. server_port = create_test_server
  240. host = "localhost:#{server_port}"
  241. th = run_server_streamer(@sent_msg, @replys, @fail)
  242. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  243. e = get_responses(stub)
  244. expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
  245. th.join
  246. end
  247. it 'should send metadata to the server ok' do
  248. server_port = create_test_server
  249. host = "localhost:#{server_port}"
  250. th = run_server_streamer(@sent_msg, @replys, @fail,
  251. k1: 'v1', k2: 'v2')
  252. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  253. e = get_responses(stub)
  254. expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
  255. th.join
  256. end
  257. end
  258. describe 'without a call operation' do
  259. def get_responses(stub)
  260. e = stub.server_streamer(@method, @sent_msg, noop, noop,
  261. metadata: { k1: 'v1', k2: 'v2' })
  262. expect(e).to be_a(Enumerator)
  263. e
  264. end
  265. it_behaves_like 'server streaming'
  266. end
  267. describe 'via a call operation' do
  268. def get_responses(stub)
  269. op = stub.server_streamer(@method, @sent_msg, noop, noop,
  270. return_op: true,
  271. metadata: { k1: 'v1', k2: 'v2' })
  272. expect(op).to be_a(GRPC::ActiveCall::Operation)
  273. e = op.execute
  274. expect(e).to be_a(Enumerator)
  275. e
  276. end
  277. it_behaves_like 'server streaming'
  278. end
  279. end
  280. describe '#bidi_streamer' do
  281. shared_examples 'bidi streaming' do
  282. before(:each) do
  283. @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
  284. @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
  285. server_port = create_test_server
  286. @host = "localhost:#{server_port}"
  287. end
  288. it 'supports sending all the requests first', bidi: true do
  289. th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
  290. @pass)
  291. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  292. e = get_responses(stub)
  293. expect(e.collect { |r| r }).to eq(@replys)
  294. th.join
  295. end
  296. it 'supports client-initiated ping pong', bidi: true do
  297. th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
  298. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  299. e = get_responses(stub)
  300. expect(e.collect { |r| r }).to eq(@sent_msgs)
  301. th.join
  302. end
  303. it 'supports a server-initiated ping pong', bidi: true do
  304. th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
  305. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  306. e = get_responses(stub)
  307. expect(e.collect { |r| r }).to eq(@sent_msgs)
  308. th.join
  309. end
  310. end
  311. describe 'without a call operation' do
  312. def get_responses(stub)
  313. e = stub.bidi_streamer(@method, @sent_msgs, noop, noop)
  314. expect(e).to be_a(Enumerator)
  315. e
  316. end
  317. it_behaves_like 'bidi streaming'
  318. end
  319. describe 'via a call operation' do
  320. def get_responses(stub)
  321. op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
  322. return_op: true)
  323. expect(op).to be_a(GRPC::ActiveCall::Operation)
  324. e = op.execute
  325. expect(e).to be_a(Enumerator)
  326. e
  327. end
  328. it_behaves_like 'bidi streaming'
  329. end
  330. end
  331. def run_server_streamer(expected_input, replys, status, **kw)
  332. wanted_metadata = kw.clone
  333. wakey_thread do |notifier|
  334. c = expect_server_to_be_invoked(notifier)
  335. wanted_metadata.each do |k, v|
  336. expect(c.metadata[k.to_s]).to eq(v)
  337. end
  338. expect(c.remote_read).to eq(expected_input)
  339. replys.each { |r| c.remote_send(r) }
  340. c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
  341. end
  342. end
  343. def run_bidi_streamer_handle_inputs_first(expected_inputs, replys,
  344. status)
  345. wakey_thread do |notifier|
  346. c = expect_server_to_be_invoked(notifier)
  347. expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
  348. replys.each { |r| c.remote_send(r) }
  349. c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
  350. end
  351. end
  352. def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts)
  353. wakey_thread do |notifier|
  354. c = expect_server_to_be_invoked(notifier)
  355. expected_inputs.each do |i|
  356. if client_starts
  357. expect(c.remote_read).to eq(i)
  358. c.remote_send(i)
  359. else
  360. c.remote_send(i)
  361. expect(c.remote_read).to eq(i)
  362. end
  363. end
  364. c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
  365. end
  366. end
  367. def run_client_streamer(expected_inputs, resp, status, **kw)
  368. wanted_metadata = kw.clone
  369. wakey_thread do |notifier|
  370. c = expect_server_to_be_invoked(notifier)
  371. expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
  372. wanted_metadata.each do |k, v|
  373. expect(c.metadata[k.to_s]).to eq(v)
  374. end
  375. c.remote_send(resp)
  376. c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
  377. end
  378. end
  379. def run_request_response(expected_input, resp, status, **kw)
  380. wanted_metadata = kw.clone
  381. wakey_thread do |notifier|
  382. c = expect_server_to_be_invoked(notifier)
  383. expect(c.remote_read).to eq(expected_input)
  384. wanted_metadata.each do |k, v|
  385. expect(c.metadata[k.to_s]).to eq(v)
  386. end
  387. c.remote_send(resp)
  388. c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
  389. end
  390. end
  391. def create_test_server
  392. @server = GRPC::Core::Server.new(nil)
  393. @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
  394. end
  395. def expect_server_to_be_invoked(notifier)
  396. @server.start
  397. notifier.notify(nil)
  398. recvd_rpc = @server.request_call
  399. recvd_call = recvd_rpc.call
  400. recvd_call.metadata = recvd_rpc.metadata
  401. recvd_call.run_batch(SEND_INITIAL_METADATA => nil)
  402. GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
  403. metadata_received: true)
  404. end
  405. end