client_stub_spec.rb 19 KB


  1. # Copyright 2015, Google Inc.
  2. # All rights reserved.
  3. #
  4. # Redistribution and use in source and binary forms, with or without
  5. # modification, are permitted provided that the following conditions are
  6. # met:
  7. #
  8. # * Redistributions of source code must retain the above copyright
  9. # notice, this list of conditions and the following disclaimer.
  10. # * Redistributions in binary form must reproduce the above
  11. # copyright notice, this list of conditions and the following disclaimer
  12. # in the documentation and/or other materials provided with the
  13. # distribution.
  14. # * Neither the name of Google Inc. nor the names of its
  15. # contributors may be used to endorse or promote products derived from
  16. # this software without specific prior written permission.
  17. #
  18. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  19. # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  20. # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  21. # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  22. # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  23. # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  24. # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  25. # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  26. # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  27. # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  28. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  29. require 'grpc'
  30. Thread.abort_on_exception = true
  31. def wakey_thread(&blk)
  32. n = GRPC::Notifier.new
  33. t = Thread.new do
  34. blk.call(n)
  35. end
  36. t.abort_on_exception = true
  37. n.wait
  38. t
  39. end
  40. def load_test_certs
  41. test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
  42. files = ['ca.pem', 'server1.key', 'server1.pem']
  43. files.map { |f| File.open(File.join(test_root, f)).read }
  44. end
  45. include GRPC::Core::StatusCodes
  46. include GRPC::Core::TimeConsts
  47. include GRPC::Core::CallOps
  48. describe 'ClientStub' do
  49. let(:noop) { proc { |x| x } }
  50. before(:each) do
  51. Thread.abort_on_exception = true
  52. @server = nil
  53. @method = 'an_rpc_method'
  54. @pass = OK
  55. @fail = INTERNAL
  56. end
  57. after(:each) do
  58. @server.close(from_relative_time(2)) unless @server.nil?
  59. end
  60. describe '#new' do
  61. let(:fake_host) { 'localhost:0' }
  62. it 'can be created from a host and args' do
  63. opts = { channel_args: { a_channel_arg: 'an_arg' } }
  64. blk = proc do
  65. GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
  66. end
  67. expect(&blk).not_to raise_error
  68. end
  69. it 'can be created with an channel override' do
  70. opts = {
  71. channel_args: { a_channel_arg: 'an_arg' },
  72. channel_override: @ch
  73. }
  74. blk = proc do
  75. GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
  76. end
  77. expect(&blk).not_to raise_error
  78. end
  79. it 'cannot be created with a bad channel override' do
  80. blk = proc do
  81. opts = {
  82. channel_args: { a_channel_arg: 'an_arg' },
  83. channel_override: Object.new
  84. }
  85. GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
  86. end
  87. expect(&blk).to raise_error
  88. end
  89. it 'cannot be created with bad credentials' do
  90. blk = proc do
  91. opts = { channel_args: { a_channel_arg: 'an_arg' } }
  92. GRPC::ClientStub.new(fake_host, Object.new, **opts)
  93. end
  94. expect(&blk).to raise_error
  95. end
  96. it 'can be created with test test credentials' do
  97. certs = load_test_certs
  98. blk = proc do
  99. opts = {
  100. channel_args: {
  101. GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr',
  102. a_channel_arg: 'an_arg'
  103. }
  104. }
  105. creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil)
  106. GRPC::ClientStub.new(fake_host, creds, **opts)
  107. end
  108. expect(&blk).to_not raise_error
  109. end
  110. end
  111. describe '#request_response' do
  112. before(:each) do
  113. @sent_msg, @resp = 'a_msg', 'a_reply'
  114. end
  115. shared_examples 'request response' do
  116. it 'should send a request to/receive a reply from a server' do
  117. server_port = create_test_server
  118. th = run_request_response(@sent_msg, @resp, @pass)
  119. stub = GRPC::ClientStub.new("localhost:#{server_port}",
  120. :this_channel_is_insecure)
  121. expect(get_response(stub)).to eq(@resp)
  122. th.join
  123. end
  124. it 'should send metadata to the server ok' do
  125. server_port = create_test_server
  126. host = "localhost:#{server_port}"
  127. th = run_request_response(@sent_msg, @resp, @pass,
  128. k1: 'v1', k2: 'v2')
  129. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  130. expect(get_response(stub)).to eq(@resp)
  131. th.join
  132. end
  133. it 'should send a request when configured using an override channel' do
  134. server_port = create_test_server
  135. alt_host = "localhost:#{server_port}"
  136. th = run_request_response(@sent_msg, @resp, @pass)
  137. ch = GRPC::Core::Channel.new(alt_host, nil, :this_channel_is_insecure)
  138. stub = GRPC::ClientStub.new('ignored-host',
  139. :this_channel_is_insecure,
  140. channel_override: ch)
  141. expect(get_response(stub)).to eq(@resp)
  142. th.join
  143. end
  144. it 'should raise an error if the status is not OK' do
  145. server_port = create_test_server
  146. host = "localhost:#{server_port}"
  147. th = run_request_response(@sent_msg, @resp, @fail)
  148. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  149. blk = proc { get_response(stub) }
  150. expect(&blk).to raise_error(GRPC::BadStatus)
  151. th.join
  152. end
  153. it 'should receive UNAUTHENTICATED if call credentials plugin fails' do
  154. server_port = create_secure_test_server
  155. th = run_request_response(@sent_msg, @resp, @pass)
  156. certs = load_test_certs
  157. secure_channel_creds = GRPC::Core::ChannelCredentials.new(
  158. certs[0], nil, nil)
  159. secure_stub_opts = {
  160. channel_args: {
  161. GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr'
  162. }
  163. }
  164. stub = GRPC::ClientStub.new("localhost:#{server_port}",
  165. secure_channel_creds, **secure_stub_opts)
  166. error_message = 'Failing call credentials callback'
  167. failing_auth = proc do
  168. fail error_message
  169. end
  170. creds = GRPC::Core::CallCredentials.new(failing_auth)
  171. error_occured = false
  172. begin
  173. get_response(stub, credentials: creds)
  174. rescue GRPC::BadStatus => e
  175. error_occured = true
  176. expect(e.code).to eq(GRPC::Core::StatusCodes::UNAUTHENTICATED)
  177. expect(e.details.include?(error_message)).to be true
  178. end
  179. expect(error_occured).to eq(true)
  180. # Kill the server thread so tests can complete
  181. th.kill
  182. end
  183. end
  184. describe 'without a call operation' do
  185. def get_response(stub, credentials: nil)
  186. puts credentials.inspect
  187. stub.request_response(@method, @sent_msg, noop, noop,
  188. metadata: { k1: 'v1', k2: 'v2' },
  189. credentials: credentials)
  190. end
  191. it_behaves_like 'request response'
  192. end
  193. describe 'via a call operation' do
  194. def get_response(stub, run_start_call_first: false, credentials: nil)
  195. op = stub.request_response(@method, @sent_msg, noop, noop,
  196. return_op: true,
  197. metadata: { k1: 'v1', k2: 'v2' },
  198. deadline: from_relative_time(2),
  199. credentials: credentials)
  200. expect(op).to be_a(GRPC::ActiveCall::Operation)
  201. op.start_call if run_start_call_first
  202. result = op.execute
  203. op.wait # make sure wait doesn't hang
  204. result
  205. end
  206. it_behaves_like 'request response'
  207. it 'sends metadata to the server ok when running start_call first' do
  208. server_port = create_test_server
  209. host = "localhost:#{server_port}"
  210. th = run_request_response(@sent_msg, @resp, @pass,
  211. k1: 'v1', k2: 'v2')
  212. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  213. expect(get_response(stub)).to eq(@resp)
  214. th.join
  215. end
  216. end
  217. end
  218. describe '#client_streamer' do
  219. before(:each) do
  220. Thread.abort_on_exception = true
  221. server_port = create_test_server
  222. host = "localhost:#{server_port}"
  223. @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  224. @metadata = { k1: 'v1', k2: 'v2' }
  225. @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
  226. @resp = 'a_reply'
  227. end
  228. shared_examples 'client streaming' do
  229. it 'should send requests to/receive a reply from a server' do
  230. th = run_client_streamer(@sent_msgs, @resp, @pass)
  231. expect(get_response(@stub)).to eq(@resp)
  232. th.join
  233. end
  234. it 'should send metadata to the server ok' do
  235. th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
  236. expect(get_response(@stub)).to eq(@resp)
  237. th.join
  238. end
  239. it 'should raise an error if the status is not ok' do
  240. th = run_client_streamer(@sent_msgs, @resp, @fail)
  241. blk = proc { get_response(@stub) }
  242. expect(&blk).to raise_error(GRPC::BadStatus)
  243. th.join
  244. end
  245. it 'should raise ArgumentError if metadata contains invalid values' do
  246. @metadata.merge!(k3: 3)
  247. expect do
  248. get_response(@stub)
  249. end.to raise_error(ArgumentError,
  250. /Header values must be of type string or array/)
  251. end
  252. end
  253. describe 'without a call operation' do
  254. def get_response(stub)
  255. stub.client_streamer(@method, @sent_msgs, noop, noop,
  256. metadata: @metadata)
  257. end
  258. it_behaves_like 'client streaming'
  259. end
  260. describe 'via a call operation' do
  261. def get_response(stub, run_start_call_first: false)
  262. op = stub.client_streamer(@method, @sent_msgs, noop, noop,
  263. return_op: true, metadata: @metadata)
  264. expect(op).to be_a(GRPC::ActiveCall::Operation)
  265. op.start_call if run_start_call_first
  266. result = op.execute
  267. op.wait # make sure wait doesn't hang
  268. result
  269. end
  270. it_behaves_like 'client streaming'
  271. it 'sends metadata to the server ok when running start_call first' do
  272. th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
  273. expect(get_response(@stub, run_start_call_first: true)).to eq(@resp)
  274. th.join
  275. end
  276. end
  277. end
  278. describe '#server_streamer' do
  279. before(:each) do
  280. @sent_msg = 'a_msg'
  281. @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
  282. end
  283. shared_examples 'server streaming' do
  284. it 'should send a request to/receive replies from a server' do
  285. server_port = create_test_server
  286. host = "localhost:#{server_port}"
  287. th = run_server_streamer(@sent_msg, @replys, @pass)
  288. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  289. expect(get_responses(stub).collect { |r| r }).to eq(@replys)
  290. th.join
  291. end
  292. it 'should raise an error if the status is not ok' do
  293. server_port = create_test_server
  294. host = "localhost:#{server_port}"
  295. th = run_server_streamer(@sent_msg, @replys, @fail)
  296. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  297. e = get_responses(stub)
  298. expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
  299. th.join
  300. end
  301. it 'should send metadata to the server ok' do
  302. server_port = create_test_server
  303. host = "localhost:#{server_port}"
  304. th = run_server_streamer(@sent_msg, @replys, @fail,
  305. k1: 'v1', k2: 'v2')
  306. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  307. e = get_responses(stub)
  308. expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
  309. th.join
  310. end
  311. end
  312. describe 'without a call operation' do
  313. def get_responses(stub)
  314. e = stub.server_streamer(@method, @sent_msg, noop, noop,
  315. metadata: { k1: 'v1', k2: 'v2' })
  316. expect(e).to be_a(Enumerator)
  317. e
  318. end
  319. it_behaves_like 'server streaming'
  320. end
  321. describe 'via a call operation' do
  322. after(:each) do
  323. @op.wait # make sure wait doesn't hang
  324. end
  325. def get_responses(stub, run_start_call_first: false)
  326. @op = stub.server_streamer(@method, @sent_msg, noop, noop,
  327. return_op: true,
  328. metadata: { k1: 'v1', k2: 'v2' })
  329. expect(@op).to be_a(GRPC::ActiveCall::Operation)
  330. @op.start_call if run_start_call_first
  331. e = @op.execute
  332. expect(e).to be_a(Enumerator)
  333. e
  334. end
  335. it_behaves_like 'server streaming'
  336. it 'should send metadata to the server ok when start_call is run first' do
  337. server_port = create_test_server
  338. host = "localhost:#{server_port}"
  339. th = run_server_streamer(@sent_msg, @replys, @fail,
  340. k1: 'v1', k2: 'v2')
  341. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  342. e = get_responses(stub, run_start_call_first: true)
  343. expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
  344. th.join
  345. end
  346. end
  347. end
  348. describe '#bidi_streamer' do
  349. before(:each) do
  350. @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
  351. @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
  352. server_port = create_test_server
  353. @host = "localhost:#{server_port}"
  354. end
  355. shared_examples 'bidi streaming' do
  356. it 'supports sending all the requests first', bidi: true do
  357. th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
  358. @pass)
  359. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  360. e = get_responses(stub)
  361. expect(e.collect { |r| r }).to eq(@replys)
  362. th.join
  363. end
  364. it 'supports client-initiated ping pong', bidi: true do
  365. th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
  366. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  367. e = get_responses(stub)
  368. expect(e.collect { |r| r }).to eq(@sent_msgs)
  369. th.join
  370. end
  371. it 'supports a server-initiated ping pong', bidi: true do
  372. th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
  373. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  374. e = get_responses(stub)
  375. expect(e.collect { |r| r }).to eq(@sent_msgs)
  376. th.join
  377. end
  378. end
  379. describe 'without a call operation' do
  380. def get_responses(stub)
  381. e = stub.bidi_streamer(@method, @sent_msgs, noop, noop)
  382. expect(e).to be_a(Enumerator)
  383. e
  384. end
  385. it_behaves_like 'bidi streaming'
  386. end
  387. describe 'via a call operation' do
  388. after(:each) do
  389. @op.wait # make sure wait doesn't hang
  390. end
  391. def get_responses(stub, run_start_call_first: false)
  392. @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
  393. return_op: true)
  394. expect(@op).to be_a(GRPC::ActiveCall::Operation)
  395. @op.start_call if run_start_call_first
  396. e = @op.execute
  397. expect(e).to be_a(Enumerator)
  398. e
  399. end
  400. it_behaves_like 'bidi streaming'
  401. it 'can run start_call before executing the call' do
  402. th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
  403. @pass)
  404. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  405. e = get_responses(stub, run_start_call_first: true)
  406. expect(e.collect { |r| r }).to eq(@replys)
  407. th.join
  408. end
  409. end
  410. end
  411. def run_server_streamer(expected_input, replys, status, **kw)
  412. wanted_metadata = kw.clone
  413. wakey_thread do |notifier|
  414. c = expect_server_to_be_invoked(notifier)
  415. wanted_metadata.each do |k, v|
  416. expect(c.metadata[k.to_s]).to eq(v)
  417. end
  418. expect(c.remote_read).to eq(expected_input)
  419. replys.each { |r| c.remote_send(r) }
  420. c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
  421. end
  422. end
  423. def run_bidi_streamer_handle_inputs_first(expected_inputs, replys,
  424. status)
  425. wakey_thread do |notifier|
  426. c = expect_server_to_be_invoked(notifier)
  427. expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
  428. replys.each { |r| c.remote_send(r) }
  429. c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
  430. end
  431. end
  432. def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts)
  433. wakey_thread do |notifier|
  434. c = expect_server_to_be_invoked(notifier)
  435. expected_inputs.each do |i|
  436. if client_starts
  437. expect(c.remote_read).to eq(i)
  438. c.remote_send(i)
  439. else
  440. c.remote_send(i)
  441. expect(c.remote_read).to eq(i)
  442. end
  443. end
  444. c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
  445. end
  446. end
  447. def run_client_streamer(expected_inputs, resp, status, **kw)
  448. wanted_metadata = kw.clone
  449. wakey_thread do |notifier|
  450. c = expect_server_to_be_invoked(notifier)
  451. expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
  452. wanted_metadata.each do |k, v|
  453. expect(c.metadata[k.to_s]).to eq(v)
  454. end
  455. c.remote_send(resp)
  456. c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
  457. end
  458. end
  459. def run_request_response(expected_input, resp, status, **kw)
  460. wanted_metadata = kw.clone
  461. wakey_thread do |notifier|
  462. c = expect_server_to_be_invoked(notifier)
  463. expect(c.remote_read).to eq(expected_input)
  464. wanted_metadata.each do |k, v|
  465. expect(c.metadata[k.to_s]).to eq(v)
  466. end
  467. c.remote_send(resp)
  468. c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
  469. end
  470. end
  471. def create_secure_test_server
  472. certs = load_test_certs
  473. secure_credentials = GRPC::Core::ServerCredentials.new(
  474. nil, [{ private_key: certs[1], cert_chain: certs[2] }], false)
  475. @server = GRPC::Core::Server.new(nil)
  476. @server.add_http2_port('0.0.0.0:0', secure_credentials)
  477. end
  478. def create_test_server
  479. @server = GRPC::Core::Server.new(nil)
  480. @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
  481. end
  482. def expect_server_to_be_invoked(notifier)
  483. @server.start
  484. notifier.notify(nil)
  485. recvd_rpc = @server.request_call
  486. recvd_call = recvd_rpc.call
  487. recvd_call.metadata = recvd_rpc.metadata
  488. recvd_call.run_batch(SEND_INITIAL_METADATA => nil)
  489. GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
  490. metadata_received: true)
  491. end
  492. end