client_stub_spec.rb 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083
  1. # Copyright 2015 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. require 'spec_helper'
  15. Thread.abort_on_exception = true
  16. def wakey_thread(&blk)
  17. n = GRPC::Notifier.new
  18. t = Thread.new do
  19. blk.call(n)
  20. end
  21. t.abort_on_exception = true
  22. n.wait
  23. t
  24. end
  25. def load_test_certs
  26. test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
  27. files = ['ca.pem', 'server1.key', 'server1.pem']
  28. files.map { |f| File.open(File.join(test_root, f)).read }
  29. end
  30. include GRPC::Core::StatusCodes
  31. include GRPC::Core::TimeConsts
  32. include GRPC::Core::CallOps
  33. # check that methods on a finished/closed call t crash
  34. def check_op_view_of_finished_client_call(op_view,
  35. expected_metadata,
  36. expected_trailing_metadata)
  37. # use read_response_stream to try to iterate through
  38. # possible response stream
  39. fail('need something to attempt reads') unless block_given?
  40. expect do
  41. resp = op_view.execute
  42. yield resp
  43. end.to raise_error(GRPC::Core::CallError)
  44. expect { op_view.start_call }.to raise_error(RuntimeError)
  45. sanity_check_values_of_accessors(op_view,
  46. expected_metadata,
  47. expected_trailing_metadata)
  48. expect do
  49. op_view.wait
  50. op_view.cancel
  51. op_view.write_flag = 1
  52. end.to_not raise_error
  53. end
  54. def sanity_check_values_of_accessors(op_view,
  55. expected_metadata,
  56. expected_trailing_metadata)
  57. expected_status = Struct::Status.new
  58. expected_status.code = 0
  59. expected_status.details = 'OK'
  60. expected_status.metadata = expected_trailing_metadata
  61. expect(op_view.status).to eq(expected_status)
  62. expect(op_view.metadata).to eq(expected_metadata)
  63. expect(op_view.trailing_metadata).to eq(expected_trailing_metadata)
  64. expect(op_view.cancelled?).to be(false)
  65. expect(op_view.write_flag).to be(nil)
  66. # The deadline attribute of a call can be either
  67. # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive.
  68. # TODO: fix so that the accessor always returns the same type.
  69. expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) ||
  70. op_view.deadline.is_a?(Time)).to be(true)
  71. end
  72. def close_active_server_call(active_server_call)
  73. active_server_call.send(:set_input_stream_done)
  74. active_server_call.send(:set_output_stream_done)
  75. end
  76. describe 'ClientStub' do # rubocop:disable Metrics/BlockLength
  77. let(:noop) { proc { |x| x } }
  78. before(:each) do
  79. Thread.abort_on_exception = true
  80. @server = nil
  81. @method = 'an_rpc_method'
  82. @pass = OK
  83. @fail = INTERNAL
  84. @metadata = { k1: 'v1', k2: 'v2' }
  85. end
  86. after(:each) do
  87. unless @server.nil?
  88. @server.shutdown_and_notify(from_relative_time(2))
  89. @server.close
  90. end
  91. end
  92. describe '#new' do
  93. let(:fake_host) { 'localhost:0' }
  94. it 'can be created from a host and args' do
  95. opts = { channel_args: { a_channel_arg: 'an_arg' } }
  96. blk = proc do
  97. GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
  98. end
  99. expect(&blk).not_to raise_error
  100. end
  101. it 'can be created with an channel override' do
  102. opts = {
  103. channel_args: { a_channel_arg: 'an_arg' },
  104. channel_override: @ch
  105. }
  106. blk = proc do
  107. GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
  108. end
  109. expect(&blk).not_to raise_error
  110. end
  111. it 'cannot be created with a bad channel override' do
  112. blk = proc do
  113. opts = {
  114. channel_args: { a_channel_arg: 'an_arg' },
  115. channel_override: Object.new
  116. }
  117. GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
  118. end
  119. expect(&blk).to raise_error
  120. end
  121. it 'cannot be created with bad credentials' do
  122. blk = proc do
  123. opts = { channel_args: { a_channel_arg: 'an_arg' } }
  124. GRPC::ClientStub.new(fake_host, Object.new, **opts)
  125. end
  126. expect(&blk).to raise_error
  127. end
  128. it 'can be created with test test credentials' do
  129. certs = load_test_certs
  130. blk = proc do
  131. opts = {
  132. channel_args: {
  133. GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr',
  134. a_channel_arg: 'an_arg'
  135. }
  136. }
  137. creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil)
  138. GRPC::ClientStub.new(fake_host, creds, **opts)
  139. end
  140. expect(&blk).to_not raise_error
  141. end
  142. end
  143. describe '#request_response', request_response: true do
  144. before(:each) do
  145. @sent_msg, @resp = 'a_msg', 'a_reply'
  146. end
  147. shared_examples 'request response' do
  148. it 'should send a request to/receive a reply from a server' do
  149. server_port = create_test_server
  150. th = run_request_response(@sent_msg, @resp, @pass)
  151. stub = GRPC::ClientStub.new("localhost:#{server_port}",
  152. :this_channel_is_insecure)
  153. expect(get_response(stub)).to eq(@resp)
  154. th.join
  155. end
  156. def metadata_test(md)
  157. server_port = create_test_server
  158. host = "localhost:#{server_port}"
  159. th = run_request_response(@sent_msg, @resp, @pass,
  160. expected_metadata: md)
  161. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  162. @metadata = md
  163. expect(get_response(stub)).to eq(@resp)
  164. th.join
  165. end
  166. it 'should send metadata to the server ok' do
  167. metadata_test(k1: 'v1', k2: 'v2')
  168. end
  169. # these tests mostly try to exercise when md might be allocated
  170. # instead of inlined
  171. it 'should send metadata with multiple large md to the server ok' do
  172. val_array = %w(
  173. '00000000000000000000000000000000000000000000000000000000000000',
  174. '11111111111111111111111111111111111111111111111111111111111111',
  175. '22222222222222222222222222222222222222222222222222222222222222',
  176. )
  177. md = {
  178. k1: val_array,
  179. k2: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
  180. k3: 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
  181. k4: 'cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc',
  182. keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey5: 'v5',
  183. 'k66666666666666666666666666666666666666666666666666666' => 'v6',
  184. 'k77777777777777777777777777777777777777777777777777777' => 'v7',
  185. 'k88888888888888888888888888888888888888888888888888888' => 'v8'
  186. }
  187. metadata_test(md)
  188. end
  189. it 'should send a request when configured using an override channel' do
  190. server_port = create_test_server
  191. alt_host = "localhost:#{server_port}"
  192. th = run_request_response(@sent_msg, @resp, @pass)
  193. ch = GRPC::Core::Channel.new(alt_host, nil, :this_channel_is_insecure)
  194. stub = GRPC::ClientStub.new('ignored-host',
  195. :this_channel_is_insecure,
  196. channel_override: ch)
  197. expect(get_response(stub)).to eq(@resp)
  198. th.join
  199. end
  200. it 'should raise an error if the status is not OK' do
  201. server_port = create_test_server
  202. host = "localhost:#{server_port}"
  203. th = run_request_response(@sent_msg, @resp, @fail)
  204. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  205. blk = proc { get_response(stub) }
  206. expect(&blk).to raise_error(GRPC::BadStatus)
  207. th.join
  208. end
  209. it 'should receive UNAVAILABLE if call credentials plugin fails' do
  210. server_port = create_secure_test_server
  211. server_started_notifier = GRPC::Notifier.new
  212. th = Thread.new do
  213. @server.start
  214. server_started_notifier.notify(nil)
  215. # Poll on the server so that the client connection can proceed.
  216. # We don't expect the server to actually accept a call though.
  217. expect { @server.request_call }.to raise_error(GRPC::Core::CallError)
  218. end
  219. server_started_notifier.wait
  220. certs = load_test_certs
  221. secure_channel_creds = GRPC::Core::ChannelCredentials.new(
  222. certs[0], nil, nil)
  223. secure_stub_opts = {
  224. channel_args: {
  225. GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr'
  226. }
  227. }
  228. stub = GRPC::ClientStub.new("localhost:#{server_port}",
  229. secure_channel_creds, **secure_stub_opts)
  230. error_message = 'Failing call credentials callback'
  231. failing_auth = proc do
  232. fail error_message
  233. end
  234. creds = GRPC::Core::CallCredentials.new(failing_auth)
  235. unavailable_error_occurred = false
  236. begin
  237. get_response(stub, credentials: creds)
  238. rescue GRPC::Unavailable => e
  239. unavailable_error_occurred = true
  240. expect(e.details.include?(error_message)).to be true
  241. end
  242. expect(unavailable_error_occurred).to eq(true)
  243. @server.shutdown_and_notify(Time.now + 3)
  244. th.join
  245. @server.close
  246. end
  247. it 'should raise ArgumentError if metadata contains invalid values' do
  248. @metadata.merge!(k3: 3)
  249. server_port = create_test_server
  250. host = "localhost:#{server_port}"
  251. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  252. expect do
  253. get_response(stub)
  254. end.to raise_error(ArgumentError,
  255. /Header values must be of type string or array/)
  256. end
  257. end
  258. describe 'without a call operation' do
  259. def get_response(stub, credentials: nil)
  260. GRPC.logger.info(credentials.inspect)
  261. stub.request_response(@method, @sent_msg, noop, noop,
  262. metadata: @metadata,
  263. credentials: credentials)
  264. end
  265. it_behaves_like 'request response'
  266. end
  267. describe 'via a call operation' do
  268. after(:each) do
  269. # make sure op.wait doesn't hang, even if there's a bad status
  270. @op.wait
  271. end
  272. def get_response(stub, run_start_call_first: false, credentials: nil)
  273. @op = stub.request_response(@method, @sent_msg, noop, noop,
  274. return_op: true,
  275. metadata: @metadata,
  276. deadline: from_relative_time(2),
  277. credentials: credentials)
  278. expect(@op).to be_a(GRPC::ActiveCall::Operation)
  279. @op.start_call if run_start_call_first
  280. result = @op.execute
  281. result
  282. end
  283. it_behaves_like 'request response'
  284. def run_op_view_metadata_test(run_start_call_first)
  285. server_port = create_test_server
  286. host = "localhost:#{server_port}"
  287. @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
  288. @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
  289. th = run_request_response(
  290. @sent_msg, @resp, @pass,
  291. expected_metadata: @metadata,
  292. server_initial_md: @server_initial_md,
  293. server_trailing_md: @server_trailing_md)
  294. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  295. expect(
  296. get_response(stub,
  297. run_start_call_first: run_start_call_first)).to eq(@resp)
  298. th.join
  299. end
  300. it 'sends metadata to the server ok when running start_call first' do
  301. run_op_view_metadata_test(true)
  302. check_op_view_of_finished_client_call(
  303. @op, @server_initial_md, @server_trailing_md
  304. ) { |r| GRPC.logger.info(r) }
  305. end
  306. it 'does not crash when used after the call has been finished' do
  307. run_op_view_metadata_test(false)
  308. check_op_view_of_finished_client_call(
  309. @op, @server_initial_md, @server_trailing_md
  310. ) { |r| GRPC.logger.info(r) }
  311. end
  312. end
  313. end
  314. describe '#client_streamer', client_streamer: true do
  315. before(:each) do
  316. Thread.abort_on_exception = true
  317. server_port = create_test_server
  318. host = "localhost:#{server_port}"
  319. @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  320. @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
  321. @resp = 'a_reply'
  322. end
  323. shared_examples 'client streaming' do
  324. it 'should send requests to/receive a reply from a server' do
  325. th = run_client_streamer(@sent_msgs, @resp, @pass)
  326. expect(get_response(@stub)).to eq(@resp)
  327. th.join
  328. end
  329. it 'should send metadata to the server ok' do
  330. th = run_client_streamer(@sent_msgs, @resp, @pass,
  331. expected_metadata: @metadata)
  332. expect(get_response(@stub)).to eq(@resp)
  333. th.join
  334. end
  335. it 'should raise an error if the status is not ok' do
  336. th = run_client_streamer(@sent_msgs, @resp, @fail)
  337. blk = proc { get_response(@stub) }
  338. expect(&blk).to raise_error(GRPC::BadStatus)
  339. th.join
  340. end
  341. it 'should raise ArgumentError if metadata contains invalid values' do
  342. @metadata.merge!(k3: 3)
  343. expect do
  344. get_response(@stub)
  345. end.to raise_error(ArgumentError,
  346. /Header values must be of type string or array/)
  347. end
  348. end
  349. describe 'without a call operation' do
  350. def get_response(stub)
  351. stub.client_streamer(@method, @sent_msgs, noop, noop,
  352. metadata: @metadata)
  353. end
  354. it_behaves_like 'client streaming'
  355. end
  356. describe 'via a call operation' do
  357. after(:each) do
  358. # make sure op.wait doesn't hang, even if there's a bad status
  359. @op.wait
  360. end
  361. def get_response(stub, run_start_call_first: false)
  362. @op = stub.client_streamer(@method, @sent_msgs, noop, noop,
  363. return_op: true, metadata: @metadata)
  364. expect(@op).to be_a(GRPC::ActiveCall::Operation)
  365. @op.start_call if run_start_call_first
  366. result = @op.execute
  367. result
  368. end
  369. it_behaves_like 'client streaming'
  370. def run_op_view_metadata_test(run_start_call_first)
  371. @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
  372. @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
  373. th = run_client_streamer(
  374. @sent_msgs, @resp, @pass,
  375. expected_metadata: @metadata,
  376. server_initial_md: @server_initial_md,
  377. server_trailing_md: @server_trailing_md)
  378. expect(
  379. get_response(@stub,
  380. run_start_call_first: run_start_call_first)).to eq(@resp)
  381. th.join
  382. end
  383. it 'sends metadata to the server ok when running start_call first' do
  384. run_op_view_metadata_test(true)
  385. check_op_view_of_finished_client_call(
  386. @op, @server_initial_md, @server_trailing_md
  387. ) { |r| GRPC.logger.info(r) }
  388. end
  389. it 'does not crash when used after the call has been finished' do
  390. run_op_view_metadata_test(false)
  391. check_op_view_of_finished_client_call(
  392. @op, @server_initial_md, @server_trailing_md
  393. ) { |r| GRPC.logger.info(r) }
  394. end
  395. end
  396. end
  397. describe '#server_streamer', server_streamer: true do
  398. before(:each) do
  399. @sent_msg = 'a_msg'
  400. @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
  401. end
  402. shared_examples 'server streaming' do
  403. it 'should send a request to/receive replies from a server' do
  404. server_port = create_test_server
  405. host = "localhost:#{server_port}"
  406. th = run_server_streamer(@sent_msg, @replys, @pass)
  407. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  408. expect(get_responses(stub).collect { |r| r }).to eq(@replys)
  409. th.join
  410. end
  411. it 'should raise an error if the status is not ok' do
  412. server_port = create_test_server
  413. host = "localhost:#{server_port}"
  414. th = run_server_streamer(@sent_msg, @replys, @fail)
  415. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  416. e = get_responses(stub)
  417. expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
  418. th.join
  419. end
  420. it 'should send metadata to the server ok' do
  421. server_port = create_test_server
  422. host = "localhost:#{server_port}"
  423. th = run_server_streamer(@sent_msg, @replys, @fail,
  424. expected_metadata: { k1: 'v1', k2: 'v2' })
  425. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  426. e = get_responses(stub)
  427. expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
  428. th.join
  429. end
  430. it 'should raise ArgumentError if metadata contains invalid values' do
  431. @metadata.merge!(k3: 3)
  432. server_port = create_test_server
  433. host = "localhost:#{server_port}"
  434. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  435. expect do
  436. get_responses(stub).collect { |r| r }
  437. end.to raise_error(ArgumentError,
  438. /Header values must be of type string or array/)
  439. end
  440. def run_server_streamer_against_client_with_unmarshal_error(
  441. expected_input, replys)
  442. wakey_thread do |notifier|
  443. c = expect_server_to_be_invoked(notifier)
  444. expect(c.remote_read).to eq(expected_input)
  445. begin
  446. replys.each { |r| c.remote_send(r) }
  447. rescue GRPC::Core::CallError
  448. # An attempt to write to the client might fail. This is ok
  449. # because the client call is expected to fail when
  450. # unmarshalling the first response, and to cancel the call,
  451. # and there is a race as for when the server-side call will
  452. # start to fail.
  453. p 'remote_send failed (allowed because call expected to cancel)'
  454. ensure
  455. c.send_status(OK, 'OK', true)
  456. close_active_server_call(c)
  457. end
  458. end
  459. end
  460. it 'the call terminates when there is an unmarshalling error' do
  461. server_port = create_test_server
  462. host = "localhost:#{server_port}"
  463. th = run_server_streamer_against_client_with_unmarshal_error(
  464. @sent_msg, @replys)
  465. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  466. unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') }
  467. expect do
  468. get_responses(stub, unmarshal: unmarshal).collect { |r| r }
  469. end.to raise_error(ArgumentError, 'test unmarshalling error')
  470. th.join
  471. end
  472. end
  473. describe 'without a call operation' do
  474. def get_responses(stub, unmarshal: noop)
  475. e = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
  476. metadata: @metadata)
  477. expect(e).to be_a(Enumerator)
  478. e
  479. end
  480. it_behaves_like 'server streaming'
  481. end
  482. describe 'via a call operation' do
  483. after(:each) do
  484. @op.wait # make sure wait doesn't hang
  485. end
  486. def get_responses(stub, run_start_call_first: false, unmarshal: noop)
  487. @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
  488. return_op: true,
  489. metadata: @metadata)
  490. expect(@op).to be_a(GRPC::ActiveCall::Operation)
  491. @op.start_call if run_start_call_first
  492. e = @op.execute
  493. expect(e).to be_a(Enumerator)
  494. e
  495. end
  496. it_behaves_like 'server streaming'
  497. def run_op_view_metadata_test(run_start_call_first)
  498. server_port = create_test_server
  499. host = "localhost:#{server_port}"
  500. @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
  501. @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
  502. th = run_server_streamer(
  503. @sent_msg, @replys, @pass,
  504. expected_metadata: @metadata,
  505. server_initial_md: @server_initial_md,
  506. server_trailing_md: @server_trailing_md)
  507. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  508. e = get_responses(stub, run_start_call_first: run_start_call_first)
  509. expect(e.collect { |r| r }).to eq(@replys)
  510. th.join
  511. end
  512. it 'should send metadata to the server ok when start_call is run first' do
  513. run_op_view_metadata_test(true)
  514. check_op_view_of_finished_client_call(
  515. @op, @server_initial_md, @server_trailing_md) do |responses|
  516. responses.each { |r| GRPC.logger.info(r) }
  517. end
  518. end
  519. it 'does not crash when used after the call has been finished' do
  520. run_op_view_metadata_test(false)
  521. check_op_view_of_finished_client_call(
  522. @op, @server_initial_md, @server_trailing_md) do |responses|
  523. responses.each { |r| GRPC.logger.info(r) }
  524. end
  525. end
  526. it 'raises GRPC::Cancelled after the call has been cancelled' do
  527. server_port = create_test_server
  528. host = "localhost:#{server_port}"
  529. th = run_server_streamer(@sent_msg, @replys, @pass)
  530. stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
  531. resp = get_responses(stub, run_start_call_first: false)
  532. expect(resp.next).to eq('reply_1')
  533. @op.cancel
  534. expect { resp.next }.to raise_error(GRPC::Cancelled)
  535. th.join
  536. end
  537. end
  538. end
  539. describe '#bidi_streamer', bidi: true do
  540. before(:each) do
  541. @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
  542. @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
  543. server_port = create_test_server
  544. @host = "localhost:#{server_port}"
  545. end
  546. shared_examples 'bidi streaming' do
  547. it 'supports sending all the requests first' do
  548. th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
  549. @pass)
  550. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  551. e = get_responses(stub)
  552. expect(e.collect { |r| r }).to eq(@replys)
  553. th.join
  554. end
  555. it 'supports client-initiated ping pong' do
  556. th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
  557. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  558. e = get_responses(stub)
  559. expect(e.collect { |r| r }).to eq(@sent_msgs)
  560. th.join
  561. end
  562. it 'supports a server-initiated ping pong' do
  563. th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
  564. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  565. e = get_responses(stub)
  566. expect(e.collect { |r| r }).to eq(@sent_msgs)
  567. th.join
  568. end
  569. it 'should raise an error if the status is not ok' do
  570. th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false)
  571. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  572. e = get_responses(stub)
  573. expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
  574. th.join
  575. end
  576. it 'should raise ArgumentError if metadata contains invalid values' do
  577. @metadata.merge!(k3: 3)
  578. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  579. expect do
  580. get_responses(stub).collect { |r| r }
  581. end.to raise_error(ArgumentError,
  582. /Header values must be of type string or array/)
  583. end
  584. it 'terminates if the call fails to start' do
  585. # don't start the server
  586. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  587. expect do
  588. get_responses(stub, deadline: from_relative_time(0)).collect { |r| r }
  589. end.to raise_error(GRPC::BadStatus)
  590. end
  591. it 'should send metadata to the server ok' do
  592. th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,
  593. expected_metadata: @metadata)
  594. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  595. e = get_responses(stub)
  596. expect(e.collect { |r| r }).to eq(@sent_msgs)
  597. th.join
  598. end
  599. # Prompted by grpc/github #10526
  600. describe 'surfacing of errors when sending requests' do
  601. def run_server_bidi_send_one_then_read_indefinitely
  602. @server.start
  603. recvd_rpc = @server.request_call
  604. recvd_call = recvd_rpc.call
  605. server_call = GRPC::ActiveCall.new(
  606. recvd_call, noop, noop, INFINITE_FUTURE,
  607. metadata_received: true, started: false)
  608. server_call.send_initial_metadata
  609. server_call.remote_send('server response')
  610. loop do
  611. m = server_call.remote_read
  612. break if m.nil?
  613. end
  614. # can't fail since initial metadata already sent
  615. server_call.send_status(@pass, 'OK', true)
  616. close_active_server_call(server_call)
  617. end
  618. def verify_error_from_write_thread(stub, requests_to_push,
  619. request_queue, expected_description)
  620. # TODO: an improvement might be to raise the original exception from
  621. # bidi call write loops instead of only cancelling the call
  622. failing_marshal_proc = proc do |req|
  623. fail req if req.is_a?(StandardError)
  624. req
  625. end
  626. begin
  627. e = get_responses(stub, marshal_proc: failing_marshal_proc)
  628. first_response = e.next
  629. expect(first_response).to eq('server response')
  630. requests_to_push.each { |req| request_queue.push(req) }
  631. e.collect { |r| r }
  632. rescue GRPC::Unknown => e
  633. exception = e
  634. end
  635. expect(exception.message.include?(expected_description)).to be(true)
  636. end
  637. # Provides an Enumerable view of a Queue
  638. class BidiErrorTestingEnumerateForeverQueue
  639. def initialize(queue)
  640. @queue = queue
  641. end
  642. def each
  643. loop do
  644. msg = @queue.pop
  645. yield msg
  646. end
  647. end
  648. end
  649. def run_error_in_client_request_stream_test(requests_to_push,
  650. expected_error_message)
  651. # start a server that waits on a read indefinitely - it should
  652. # see a cancellation and be able to break out
  653. th = Thread.new { run_server_bidi_send_one_then_read_indefinitely }
  654. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  655. request_queue = Queue.new
  656. @sent_msgs = BidiErrorTestingEnumerateForeverQueue.new(request_queue)
  657. verify_error_from_write_thread(stub,
  658. requests_to_push,
  659. request_queue,
  660. expected_error_message)
  661. # the write loop errror should cancel the call and end the
  662. # server's request stream
  663. th.join
  664. end
  665. it 'non-GRPC errors from the write loop surface when raised ' \
  666. 'at the start of a request stream' do
  667. expected_error_message = 'expect error on first request'
  668. requests_to_push = [StandardError.new(expected_error_message)]
  669. run_error_in_client_request_stream_test(requests_to_push,
  670. expected_error_message)
  671. end
  672. it 'non-GRPC errors from the write loop surface when raised ' \
  673. 'during the middle of a request stream' do
  674. expected_error_message = 'expect error on last request'
  675. requests_to_push = %w( one two )
  676. requests_to_push << StandardError.new(expected_error_message)
  677. run_error_in_client_request_stream_test(requests_to_push,
  678. expected_error_message)
  679. end
  680. end
  681. # Prompted by grpc/github #14853
  682. describe 'client-side error handling on bidi streams' do
  683. class EnumeratorQueue
  684. def initialize(queue)
  685. @queue = queue
  686. end
  687. def each
  688. loop do
  689. msg = @queue.pop
  690. break if msg.nil?
  691. yield msg
  692. end
  693. end
  694. end
  695. def run_server_bidi_shutdown_after_one_read
  696. @server.start
  697. recvd_rpc = @server.request_call
  698. recvd_call = recvd_rpc.call
  699. server_call = GRPC::ActiveCall.new(
  700. recvd_call, noop, noop, INFINITE_FUTURE,
  701. metadata_received: true, started: false)
  702. expect(server_call.remote_read).to eq('first message')
  703. @server.shutdown_and_notify(from_relative_time(0))
  704. @server.close
  705. end
  706. it 'receives a grpc status code when writes to a bidi stream fail' do
  707. # This test tries to trigger the case when a 'SEND_MESSAGE' op
  708. # and subseqeunt 'SEND_CLOSE_FROM_CLIENT' op of a bidi stream fails.
  709. # In this case, iteration through the response stream should result
  710. # in a grpc status code, and the writer thread should not raise an
  711. # exception.
  712. server_thread = Thread.new do
  713. run_server_bidi_shutdown_after_one_read
  714. end
  715. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  716. request_queue = Queue.new
  717. @sent_msgs = EnumeratorQueue.new(request_queue)
  718. responses = get_responses(stub)
  719. request_queue.push('first message')
  720. # Now wait for the server to shut down.
  721. server_thread.join
  722. # Sanity check. This test is not interesting if
  723. # Thread.abort_on_exception is not set.
  724. expect(Thread.abort_on_exception).to be(true)
  725. # An attempt to send a second message should fail now that the
  726. # server is down.
  727. request_queue.push('second message')
  728. request_queue.push(nil)
  729. expect { responses.next }.to raise_error(GRPC::BadStatus)
  730. end
  731. def run_server_bidi_shutdown_after_one_write
  732. @server.start
  733. recvd_rpc = @server.request_call
  734. recvd_call = recvd_rpc.call
  735. server_call = GRPC::ActiveCall.new(
  736. recvd_call, noop, noop, INFINITE_FUTURE,
  737. metadata_received: true, started: false)
  738. server_call.send_initial_metadata
  739. server_call.remote_send('message')
  740. @server.shutdown_and_notify(from_relative_time(0))
  741. @server.close
  742. end
  743. it 'receives a grpc status code when reading from a failed bidi call' do
  744. server_thread = Thread.new do
  745. run_server_bidi_shutdown_after_one_write
  746. end
  747. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  748. request_queue = Queue.new
  749. @sent_msgs = EnumeratorQueue.new(request_queue)
  750. responses = get_responses(stub)
  751. expect(responses.next).to eq('message')
  752. # Wait for the server to shut down
  753. server_thread.join
  754. expect { responses.next }.to raise_error(GRPC::BadStatus)
  755. # Push a sentinel to allow the writer thread to finish
  756. request_queue.push(nil)
  757. end
  758. end
  759. end
  760. describe 'without a call operation' do
  761. def get_responses(stub, deadline: nil, marshal_proc: noop)
  762. e = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop,
  763. metadata: @metadata, deadline: deadline)
  764. expect(e).to be_a(Enumerator)
  765. e
  766. end
  767. it_behaves_like 'bidi streaming'
  768. end
  769. describe 'via a call operation' do
  770. after(:each) do
  771. @op.wait # make sure wait doesn't hang
  772. end
  773. def get_responses(stub, run_start_call_first: false, deadline: nil,
  774. marshal_proc: noop)
  775. @op = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop,
  776. return_op: true,
  777. metadata: @metadata, deadline: deadline)
  778. expect(@op).to be_a(GRPC::ActiveCall::Operation)
  779. @op.start_call if run_start_call_first
  780. e = @op.execute
  781. expect(e).to be_a(Enumerator)
  782. e
  783. end
  784. it_behaves_like 'bidi streaming'
  785. def run_op_view_metadata_test(run_start_call_first)
  786. @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
  787. @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
  788. th = run_bidi_streamer_echo_ping_pong(
  789. @sent_msgs, @pass, true,
  790. expected_metadata: @metadata,
  791. server_initial_md: @server_initial_md,
  792. server_trailing_md: @server_trailing_md)
  793. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  794. e = get_responses(stub, run_start_call_first: run_start_call_first)
  795. expect(e.collect { |r| r }).to eq(@sent_msgs)
  796. th.join
  797. end
  798. it 'can run start_call before executing the call' do
  799. run_op_view_metadata_test(true)
  800. check_op_view_of_finished_client_call(
  801. @op, @server_initial_md, @server_trailing_md) do |responses|
  802. responses.each { |r| GRPC.logger.info(r) }
  803. end
  804. end
  805. it 'doesnt crash when op_view used after call has finished' do
  806. run_op_view_metadata_test(false)
  807. check_op_view_of_finished_client_call(
  808. @op, @server_initial_md, @server_trailing_md) do |responses|
  809. responses.each { |r| GRPC.logger.info(r) }
  810. end
  811. end
  812. def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
  813. @server.start
  814. recvd_rpc = @server.request_call
  815. recvd_call = recvd_rpc.call
  816. server_call = GRPC::ActiveCall.new(
  817. recvd_call, noop, noop, INFINITE_FUTURE,
  818. metadata_received: true, started: false)
  819. server_call.send_initial_metadata
  820. server_call.remote_send('server call received')
  821. wait_for_shutdown_ok_callback.call
  822. # since the client is cancelling the call,
  823. # we should be able to shut down cleanly
  824. @server.shutdown_and_notify(nil)
  825. @server.close
  826. end
  827. it 'receives a grpc status code when reading from a cancelled bidi call' do
  828. # This test tries to trigger a 'RECV_INITIAL_METADATA' and/or
  829. # 'RECV_MESSAGE' op failure.
  830. # An attempt to read a message might fail; in that case, iteration
  831. # through the response stream should still result in a grpc status.
  832. server_can_shutdown = false
  833. server_can_shutdown_mu = Mutex.new
  834. server_can_shutdown_cv = ConditionVariable.new
  835. wait_for_shutdown_ok_callback = proc do
  836. server_can_shutdown_mu.synchronize do
  837. server_can_shutdown_cv.wait(server_can_shutdown_mu) until server_can_shutdown
  838. end
  839. end
  840. server_thread = Thread.new do
  841. run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
  842. end
  843. stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
  844. request_queue = Queue.new
  845. @sent_msgs = EnumeratorQueue.new(request_queue)
  846. responses = get_responses(stub)
  847. expect(responses.next).to eq('server call received')
  848. @op.cancel
  849. expect { responses.next }.to raise_error(GRPC::Cancelled)
  850. # Now let the server proceed to shut down.
  851. server_can_shutdown_mu.synchronize do
  852. server_can_shutdown = true
  853. server_can_shutdown_cv.broadcast
  854. end
  855. server_thread.join
  856. # Push a sentinel to allow the writer thread to finish
  857. request_queue.push(nil)
  858. end
  859. end
  860. end
  861. def run_server_streamer(expected_input, replys, status,
  862. expected_metadata: {},
  863. server_initial_md: {},
  864. server_trailing_md: {})
  865. wanted_metadata = expected_metadata.clone
  866. wakey_thread do |notifier|
  867. c = expect_server_to_be_invoked(
  868. notifier, metadata_to_send: server_initial_md)
  869. wanted_metadata.each do |k, v|
  870. expect(c.metadata[k.to_s]).to eq(v)
  871. end
  872. expect(c.remote_read).to eq(expected_input)
  873. replys.each { |r| c.remote_send(r) }
  874. c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
  875. metadata: server_trailing_md)
  876. close_active_server_call(c)
  877. end
  878. end
  879. def run_bidi_streamer_handle_inputs_first(expected_inputs, replys,
  880. status)
  881. wakey_thread do |notifier|
  882. c = expect_server_to_be_invoked(notifier)
  883. expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
  884. replys.each { |r| c.remote_send(r) }
  885. c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
  886. close_active_server_call(c)
  887. end
  888. end
  889. def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts,
  890. expected_metadata: {},
  891. server_initial_md: {},
  892. server_trailing_md: {})
  893. wanted_metadata = expected_metadata.clone
  894. wakey_thread do |notifier|
  895. c = expect_server_to_be_invoked(
  896. notifier, metadata_to_send: server_initial_md)
  897. wanted_metadata.each do |k, v|
  898. expect(c.metadata[k.to_s]).to eq(v)
  899. end
  900. expected_inputs.each do |i|
  901. if client_starts
  902. expect(c.remote_read).to eq(i)
  903. c.remote_send(i)
  904. else
  905. c.remote_send(i)
  906. expect(c.remote_read).to eq(i)
  907. end
  908. end
  909. c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
  910. metadata: server_trailing_md)
  911. close_active_server_call(c)
  912. end
  913. end
  914. def run_client_streamer(expected_inputs, resp, status,
  915. expected_metadata: {},
  916. server_initial_md: {},
  917. server_trailing_md: {})
  918. wanted_metadata = expected_metadata.clone
  919. wakey_thread do |notifier|
  920. c = expect_server_to_be_invoked(
  921. notifier, metadata_to_send: server_initial_md)
  922. expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
  923. wanted_metadata.each do |k, v|
  924. expect(c.metadata[k.to_s]).to eq(v)
  925. end
  926. c.remote_send(resp)
  927. c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
  928. metadata: server_trailing_md)
  929. close_active_server_call(c)
  930. end
  931. end
  932. def run_request_response(expected_input, resp, status,
  933. expected_metadata: {},
  934. server_initial_md: {},
  935. server_trailing_md: {})
  936. wanted_metadata = expected_metadata.clone
  937. wakey_thread do |notifier|
  938. c = expect_server_to_be_invoked(
  939. notifier, metadata_to_send: server_initial_md)
  940. expect(c.remote_read).to eq(expected_input)
  941. wanted_metadata.each do |k, v|
  942. expect(c.metadata[k.to_s]).to eq(v)
  943. end
  944. c.remote_send(resp)
  945. c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
  946. metadata: server_trailing_md)
  947. close_active_server_call(c)
  948. end
  949. end
  950. def create_secure_test_server
  951. certs = load_test_certs
  952. secure_credentials = GRPC::Core::ServerCredentials.new(
  953. nil, [{ private_key: certs[1], cert_chain: certs[2] }], false)
  954. @server = new_core_server_for_testing(nil)
  955. @server.add_http2_port('0.0.0.0:0', secure_credentials)
  956. end
  957. def create_test_server
  958. @server = new_core_server_for_testing(nil)
  959. @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
  960. end
  961. def expect_server_to_be_invoked(notifier, metadata_to_send: nil)
  962. @server.start
  963. notifier.notify(nil)
  964. recvd_rpc = @server.request_call
  965. recvd_call = recvd_rpc.call
  966. recvd_call.metadata = recvd_rpc.metadata
  967. recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send)
  968. GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
  969. metadata_received: true)
  970. end
  971. end