GRPCCallLegacy.m 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678
  1. /*
  2. *
  3. * Copyright 2019 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #import "GRPCCallLegacy.h"
  19. #import "GRPCCall+OAuth2.h"
  20. #import "GRPCCallOptions.h"
  21. #import "GRPCTypes.h"
  22. #import "private/GRPCCore/GRPCChannelPool.h"
  23. #import "private/GRPCCore/GRPCCompletionQueue.h"
  24. #import "private/GRPCCore/GRPCHost.h"
  25. #import "private/GRPCCore/GRPCWrappedCall.h"
  26. #import "private/GRPCCore/NSData+GRPC.h"
  27. #import <RxLibrary/GRXBufferedPipe.h>
  28. #import <RxLibrary/GRXConcurrentWriteable.h>
  29. #import <RxLibrary/GRXImmediateSingleWriter.h>
  30. #import <RxLibrary/GRXWriter+Immediate.h>
  31. #include <grpc/grpc.h>
  32. const char *kCFStreamVarName = "grpc_cfstream";
  33. static NSMutableDictionary *callFlags;
  34. // At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA,
  35. // SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE,
  36. // and RECV_STATUS_ON_CLIENT.
  37. NSInteger kMaxClientBatch = 6;
  38. static NSString *const kAuthorizationHeader = @"authorization";
  39. static NSString *const kBearerPrefix = @"Bearer ";
  40. @interface GRPCCall () <GRXWriteable>
  41. // Make them read-write.
  42. @property(atomic, strong) NSDictionary *responseHeaders;
  43. @property(atomic, strong) NSDictionary *responseTrailers;
  44. - (void)receiveNextMessages:(NSUInteger)numberOfMessages;
  45. @end
  46. // The following methods of a C gRPC call object aren't reentrant, and thus
  47. // calls to them must be serialized:
  48. // - start_batch
  49. // - destroy
  50. //
  51. // start_batch with a SEND_MESSAGE argument can only be called after the
  52. // OP_COMPLETE event for any previous write is received. This is achieved by
  53. // pausing the requests writer immediately every time it writes a value, and
  54. // resuming it again when OP_COMPLETE is received.
  55. //
  56. // Similarly, start_batch with a RECV_MESSAGE argument can only be called after
  57. // the OP_COMPLETE event for any previous read is received.This is easier to
  58. // enforce, as we're writing the received messages into the writeable:
  59. // start_batch is enqueued once upon receiving the OP_COMPLETE event for the
  60. // RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for
  61. // each RECV_MESSAGE batch.
  62. @implementation GRPCCall {
  63. dispatch_queue_t _callQueue;
  64. NSString *_host;
  65. NSString *_path;
  66. GRPCCallSafety _callSafety;
  67. GRPCCallOptions *_callOptions;
  68. GRPCWrappedCall *_wrappedCall;
  69. // The C gRPC library has less guarantees on the ordering of events than we
  70. // do. Particularly, in the face of errors, there's no ordering guarantee at
  71. // all. This wrapper over our actual writeable ensures thread-safety and
  72. // correct ordering.
  73. GRXConcurrentWriteable *_responseWriteable;
  74. // The network thread wants the requestWriter to resume (when the server is ready for more input),
  75. // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
  76. // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
  77. // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
  78. // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
  79. // pause the writer immediately on writeValue:, so we need our locking to be recursive.
  80. GRXWriter *_requestWriter;
  81. // To create a retain cycle when a call is started, up until it finishes. See
  82. // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
  83. // reference to the call object if all they're interested in is the handler being executed when
  84. // the response arrives.
  85. GRPCCall *_retainSelf;
  86. GRPCRequestHeaders *_requestHeaders;
  87. // In the case that the call is a unary call (i.e. the writer to GRPCCall is of type
  88. // GRXImmediateSingleWriter), GRPCCall will delay sending ops (not send them to C core
  89. // immediately) and buffer them into a batch _unaryOpBatch. The batch is sent to C core when
  90. // the SendClose op is added.
  91. BOOL _unaryCall;
  92. NSMutableArray *_unaryOpBatch;
  93. // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch
  94. // queue
  95. dispatch_queue_t _responseQueue;
  96. // The OAuth2 token fetched from a token provider.
  97. NSString *_fetchedOauth2AccessToken;
  98. // The callback to be called when a write message op is done.
  99. void (^_writeDone)(void);
  100. // Indicate a read request to core is pending.
  101. BOOL _pendingCoreRead;
  102. // Indicate pending read message request from user.
  103. NSUInteger _pendingReceiveNextMessages;
  104. }
  105. @synthesize state = _state;
  106. + (void)initialize {
  107. // Guarantees the code in {} block is invoked only once. See ref at:
  108. // https://developer.apple.com/documentation/objectivec/nsobject/1418639-initialize?language=objc
  109. if (self == [GRPCCall self]) {
  110. grpc_init();
  111. callFlags = [NSMutableDictionary dictionary];
  112. }
  113. }
  114. + (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path {
  115. if (host.length == 0 || path.length == 0) {
  116. return;
  117. }
  118. NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
  119. @synchronized(callFlags) {
  120. switch (callSafety) {
  121. case GRPCCallSafetyDefault:
  122. callFlags[hostAndPath] = @0;
  123. break;
  124. case GRPCCallSafetyIdempotentRequest:
  125. callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
  126. break;
  127. case GRPCCallSafetyCacheableRequest:
  128. callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
  129. break;
  130. default:
  131. break;
  132. }
  133. }
  134. }
  135. + (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path {
  136. NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
  137. @synchronized(callFlags) {
  138. return [callFlags[hostAndPath] intValue];
  139. }
  140. }
  141. - (instancetype)initWithHost:(NSString *)host
  142. path:(NSString *)path
  143. requestsWriter:(GRXWriter *)requestWriter {
  144. return [self initWithHost:host
  145. path:path
  146. callSafety:GRPCCallSafetyDefault
  147. requestsWriter:requestWriter
  148. callOptions:nil
  149. writeDone:nil];
  150. }
  151. - (instancetype)initWithHost:(NSString *)host
  152. path:(NSString *)path
  153. callSafety:(GRPCCallSafety)safety
  154. requestsWriter:(GRXWriter *)requestsWriter
  155. callOptions:(GRPCCallOptions *)callOptions
  156. writeDone:(void (^)(void))writeDone {
  157. // Purposely using pointer rather than length (host.length == 0) for backwards compatibility.
  158. NSAssert(host != nil && path != nil, @"Neither host nor path can be nil.");
  159. NSAssert(safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value.");
  160. NSAssert(requestsWriter.state == GRXWriterStateNotStarted,
  161. @"The requests writer can't be already started.");
  162. if (!host || !path) {
  163. return nil;
  164. }
  165. if (safety > GRPCCallSafetyCacheableRequest) {
  166. return nil;
  167. }
  168. if (requestsWriter.state != GRXWriterStateNotStarted) {
  169. return nil;
  170. }
  171. if ((self = [super init])) {
  172. _host = [host copy];
  173. _path = [path copy];
  174. _callSafety = safety;
  175. _callOptions = [callOptions copy];
  176. // Serial queue to invoke the non-reentrant methods of the grpc_call object.
  177. _callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL);
  178. _requestWriter = requestsWriter;
  179. _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self];
  180. _writeDone = writeDone;
  181. if ([requestsWriter isKindOfClass:[GRXImmediateSingleWriter class]]) {
  182. _unaryCall = YES;
  183. _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch];
  184. }
  185. _responseQueue = dispatch_get_main_queue();
  186. // do not start a read until initial metadata is received
  187. _pendingReceiveNextMessages = 0;
  188. _pendingCoreRead = YES;
  189. }
  190. return self;
  191. }
  192. - (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
  193. @synchronized(self) {
  194. if (_state != GRXWriterStateNotStarted) {
  195. return;
  196. }
  197. _responseQueue = queue;
  198. }
  199. }
  200. #pragma mark Finish
  201. // This function should support being called within a @synchronized(self) block in another function
  202. // Should not manipulate _requestWriter for deadlock prevention.
  203. - (void)finishWithError:(NSError *)errorOrNil {
  204. @synchronized(self) {
  205. if (_state == GRXWriterStateFinished) {
  206. return;
  207. }
  208. _state = GRXWriterStateFinished;
  209. if (errorOrNil) {
  210. [_responseWriteable cancelWithError:errorOrNil];
  211. } else {
  212. [_responseWriteable enqueueSuccessfulCompletion];
  213. }
  214. // If the call isn't retained anywhere else, it can be deallocated now.
  215. _retainSelf = nil;
  216. }
  217. }
  218. - (void)cancel {
  219. @synchronized(self) {
  220. if (_state == GRXWriterStateFinished) {
  221. return;
  222. }
  223. [self finishWithError:[NSError
  224. errorWithDomain:kGRPCErrorDomain
  225. code:GRPCErrorCodeCancelled
  226. userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
  227. [_wrappedCall cancel];
  228. }
  229. _requestWriter.state = GRXWriterStateFinished;
  230. }
  231. - (void)dealloc {
  232. __block GRPCWrappedCall *wrappedCall = _wrappedCall;
  233. dispatch_async(_callQueue, ^{
  234. wrappedCall = nil;
  235. });
  236. }
  237. #pragma mark Read messages
  238. // Only called from the call queue.
  239. // The handler will be called from the network queue.
  240. - (void)startReadWithHandler:(void (^)(grpc_byte_buffer *))handler {
  241. // TODO(jcanizales): Add error handlers for async failures
  242. [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMessage alloc] initWithHandler:handler] ]];
  243. }
  244. // Called initially from the network queue once response headers are received,
  245. // then "recursively" from the responseWriteable queue after each response from the
  246. // server has been written.
  247. // If the call is currently paused, this is a noop. Restarting the call will invoke this
  248. // method.
  249. // TODO(jcanizales): Rename to readResponseIfNotPaused.
  250. - (void)maybeStartNextRead {
  251. @synchronized(self) {
  252. if (_state != GRXWriterStateStarted) {
  253. return;
  254. }
  255. if (_callOptions.flowControlEnabled && (_pendingCoreRead || _pendingReceiveNextMessages == 0)) {
  256. return;
  257. }
  258. _pendingCoreRead = YES;
  259. _pendingReceiveNextMessages--;
  260. }
  261. dispatch_async(_callQueue, ^{
  262. __weak GRPCCall *weakSelf = self;
  263. [self startReadWithHandler:^(grpc_byte_buffer *message) {
  264. if (message == NULL) {
  265. // No more messages from the server
  266. return;
  267. }
  268. __strong GRPCCall *strongSelf = weakSelf;
  269. if (strongSelf == nil) {
  270. grpc_byte_buffer_destroy(message);
  271. return;
  272. }
  273. NSData *data = [NSData grpc_dataWithByteBuffer:message];
  274. grpc_byte_buffer_destroy(message);
  275. if (!data) {
  276. // The app doesn't have enough memory to hold the server response. We
  277. // don't want to throw, because the app shouldn't crash for a behavior
  278. // that's on the hands of any server to have. Instead we finish and ask
  279. // the server to cancel.
  280. @synchronized(strongSelf) {
  281. strongSelf->_pendingCoreRead = NO;
  282. [strongSelf
  283. finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
  284. code:GRPCErrorCodeResourceExhausted
  285. userInfo:@{
  286. NSLocalizedDescriptionKey :
  287. @"Client does not have enough memory to "
  288. @"hold the server response."
  289. }]];
  290. [strongSelf->_wrappedCall cancel];
  291. }
  292. strongSelf->_requestWriter.state = GRXWriterStateFinished;
  293. } else {
  294. @synchronized(strongSelf) {
  295. [strongSelf->_responseWriteable enqueueValue:data
  296. completionHandler:^{
  297. __strong GRPCCall *strongSelf = weakSelf;
  298. if (strongSelf) {
  299. @synchronized(strongSelf) {
  300. strongSelf->_pendingCoreRead = NO;
  301. [strongSelf maybeStartNextRead];
  302. }
  303. }
  304. }];
  305. }
  306. }
  307. }];
  308. });
  309. }
  310. #pragma mark Send headers
  311. - (void)sendHeaders {
  312. // TODO (mxyan): Remove after deprecated methods are removed
  313. uint32_t callSafetyFlags = 0;
  314. switch (_callSafety) {
  315. case GRPCCallSafetyDefault:
  316. callSafetyFlags = 0;
  317. break;
  318. case GRPCCallSafetyIdempotentRequest:
  319. callSafetyFlags = GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
  320. break;
  321. case GRPCCallSafetyCacheableRequest:
  322. callSafetyFlags = GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
  323. break;
  324. }
  325. NSMutableDictionary *headers = [_requestHeaders mutableCopy];
  326. NSString *fetchedOauth2AccessToken;
  327. @synchronized(self) {
  328. fetchedOauth2AccessToken = _fetchedOauth2AccessToken;
  329. }
  330. if (fetchedOauth2AccessToken != nil) {
  331. headers[@"authorization"] = [kBearerPrefix stringByAppendingString:fetchedOauth2AccessToken];
  332. } else if (_callOptions.oauth2AccessToken != nil) {
  333. headers[@"authorization"] =
  334. [kBearerPrefix stringByAppendingString:_callOptions.oauth2AccessToken];
  335. }
  336. // TODO(jcanizales): Add error handlers for async failures
  337. GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc]
  338. initWithMetadata:headers
  339. flags:callSafetyFlags
  340. handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA
  341. dispatch_async(_callQueue, ^{
  342. if (!self->_unaryCall) {
  343. [self->_wrappedCall startBatchWithOperations:@[ op ]];
  344. } else {
  345. [self->_unaryOpBatch addObject:op];
  346. }
  347. });
  348. }
  349. - (void)receiveNextMessages:(NSUInteger)numberOfMessages {
  350. if (numberOfMessages == 0) {
  351. return;
  352. }
  353. @synchronized(self) {
  354. _pendingReceiveNextMessages += numberOfMessages;
  355. if (_state != GRXWriterStateStarted || !_callOptions.flowControlEnabled) {
  356. return;
  357. }
  358. [self maybeStartNextRead];
  359. }
  360. }
  361. #pragma mark GRXWriteable implementation
  362. // Only called from the call queue. The error handler will be called from the
  363. // network queue if the write didn't succeed.
  364. // If the call is a unary call, parameter \a errorHandler will be ignored and
  365. // the error handler of GRPCOpSendClose will be executed in case of error.
  366. - (void)writeMessage:(NSData *)message withErrorHandler:(void (^)(void))errorHandler {
  367. __weak GRPCCall *weakSelf = self;
  368. void (^resumingHandler)(void) = ^{
  369. // Resume the request writer.
  370. GRPCCall *strongSelf = weakSelf;
  371. if (strongSelf) {
  372. strongSelf->_requestWriter.state = GRXWriterStateStarted;
  373. if (strongSelf->_writeDone) {
  374. strongSelf->_writeDone();
  375. }
  376. }
  377. };
  378. GRPCOpSendMessage *op = [[GRPCOpSendMessage alloc] initWithMessage:message
  379. handler:resumingHandler];
  380. if (!_unaryCall) {
  381. [_wrappedCall startBatchWithOperations:@[ op ] errorHandler:errorHandler];
  382. } else {
  383. // Ignored errorHandler since it is the same as the one for GRPCOpSendClose.
  384. // TODO (mxyan): unify the error handlers of all Ops into a single closure.
  385. [_unaryOpBatch addObject:op];
  386. }
  387. }
  388. - (void)writeValue:(id)value {
  389. NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData");
  390. @synchronized(self) {
  391. if (_state == GRXWriterStateFinished) {
  392. return;
  393. }
  394. }
  395. // Pause the input and only resume it when the C layer notifies us that writes
  396. // can proceed.
  397. _requestWriter.state = GRXWriterStatePaused;
  398. dispatch_async(_callQueue, ^{
  399. // Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
  400. [self writeMessage:value withErrorHandler:nil];
  401. });
  402. }
  403. // Only called from the call queue. The error handler will be called from the
  404. // network queue if the requests stream couldn't be closed successfully.
  405. - (void)finishRequestWithErrorHandler:(void (^)(void))errorHandler {
  406. if (!_unaryCall) {
  407. [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendClose alloc] init] ]
  408. errorHandler:errorHandler];
  409. } else {
  410. [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]];
  411. [_wrappedCall startBatchWithOperations:_unaryOpBatch errorHandler:errorHandler];
  412. }
  413. }
  414. - (void)writesFinishedWithError:(NSError *)errorOrNil {
  415. if (errorOrNil) {
  416. [self cancel];
  417. } else {
  418. dispatch_async(_callQueue, ^{
  419. // EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
  420. [self finishRequestWithErrorHandler:nil];
  421. });
  422. }
  423. }
  424. #pragma mark Invoke
  425. // Both handlers will eventually be called, from the network queue. Writes can start immediately
  426. // after this.
  427. // The first one (headersHandler), when the response headers are received.
  428. // The second one (completionHandler), whenever the RPC finishes for any reason.
  429. - (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler
  430. completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler {
  431. dispatch_async(_callQueue, ^{
  432. // TODO(jcanizales): Add error handlers for async failures
  433. [self->_wrappedCall
  434. startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
  435. [self->_wrappedCall
  436. startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
  437. });
  438. }
  439. - (void)invokeCall {
  440. __weak GRPCCall *weakSelf = self;
  441. [self
  442. invokeCallWithHeadersHandler:^(NSDictionary *headers) {
  443. // Response headers received.
  444. __strong GRPCCall *strongSelf = weakSelf;
  445. if (strongSelf) {
  446. @synchronized(strongSelf) {
  447. // it is ok to set nil because headers are only received once
  448. strongSelf.responseHeaders = nil;
  449. // copy the header so that the GRPCOpRecvMetadata object may be dealloc'ed
  450. NSDictionary *copiedHeaders = [[NSDictionary alloc] initWithDictionary:headers
  451. copyItems:YES];
  452. strongSelf.responseHeaders = copiedHeaders;
  453. strongSelf->_pendingCoreRead = NO;
  454. [strongSelf maybeStartNextRead];
  455. }
  456. }
  457. }
  458. completionHandler:^(NSError *error, NSDictionary *trailers) {
  459. __strong GRPCCall *strongSelf = weakSelf;
  460. if (strongSelf) {
  461. strongSelf.responseTrailers = trailers;
  462. if (error) {
  463. NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
  464. if (error.userInfo) {
  465. [userInfo addEntriesFromDictionary:error.userInfo];
  466. }
  467. userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
  468. // Since gRPC core does not guarantee the headers block being called before this block,
  469. // responseHeaders might be nil.
  470. userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
  471. error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
  472. }
  473. [strongSelf finishWithError:error];
  474. strongSelf->_requestWriter.state = GRXWriterStateFinished;
  475. }
  476. }];
  477. }
  478. #pragma mark GRXWriter implementation
  479. // Lock acquired inside startWithWriteable:
  480. - (void)startCallWithWriteable:(id<GRXWriteable>)writeable {
  481. @synchronized(self) {
  482. if (_state == GRXWriterStateFinished) {
  483. return;
  484. }
  485. _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable
  486. dispatchQueue:_responseQueue];
  487. GRPCPooledChannel *channel = [[GRPCChannelPool sharedInstance] channelWithHost:_host
  488. callOptions:_callOptions];
  489. _wrappedCall = [channel wrappedCallWithPath:_path
  490. completionQueue:[GRPCCompletionQueue completionQueue]
  491. callOptions:_callOptions];
  492. if (_wrappedCall == nil) {
  493. [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
  494. code:GRPCErrorCodeUnavailable
  495. userInfo:@{
  496. NSLocalizedDescriptionKey :
  497. @"Failed to create call or channel."
  498. }]];
  499. return;
  500. }
  501. [self sendHeaders];
  502. [self invokeCall];
  503. }
  504. // Now that the RPC has been initiated, request writes can start.
  505. [_requestWriter startWithWriteable:self];
  506. }
  507. - (void)startWithWriteable:(id<GRXWriteable>)writeable {
  508. id<GRPCAuthorizationProtocol> tokenProvider = nil;
  509. @synchronized(self) {
  510. _state = GRXWriterStateStarted;
  511. // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
  512. // This makes RPCs in which the call isn't externally retained possible (as long as it is
  513. // started before being autoreleased). Care is taken not to retain self strongly in any of the
  514. // blocks used in this implementation, so that the life of the instance is determined by this
  515. // retain cycle.
  516. _retainSelf = self;
  517. // If _callOptions is nil, people must be using the deprecated v1 interface. In this case,
  518. // generate the call options from the corresponding GRPCHost configs and apply other options
  519. // that are not covered by GRPCHost.
  520. if (_callOptions == nil) {
  521. GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
  522. if (_serverName.length != 0) {
  523. callOptions.serverAuthority = _serverName;
  524. }
  525. if (_timeout > 0) {
  526. callOptions.timeout = _timeout;
  527. }
  528. uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path];
  529. if (callFlags != 0) {
  530. if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
  531. _callSafety = GRPCCallSafetyIdempotentRequest;
  532. } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
  533. _callSafety = GRPCCallSafetyCacheableRequest;
  534. }
  535. }
  536. id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
  537. if (tokenProvider != nil) {
  538. callOptions.authTokenProvider = tokenProvider;
  539. }
  540. _callOptions = callOptions;
  541. }
  542. NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
  543. @"authTokenProvider and oauth2AccessToken cannot be set at the same time");
  544. tokenProvider = _callOptions.authTokenProvider;
  545. }
  546. if (tokenProvider != nil) {
  547. __weak typeof(self) weakSelf = self;
  548. [tokenProvider getTokenWithHandler:^(NSString *token) {
  549. __strong typeof(self) strongSelf = weakSelf;
  550. if (strongSelf) {
  551. BOOL startCall = NO;
  552. @synchronized(strongSelf) {
  553. if (strongSelf->_state != GRXWriterStateFinished) {
  554. startCall = YES;
  555. if (token) {
  556. strongSelf->_fetchedOauth2AccessToken = [token copy];
  557. }
  558. }
  559. }
  560. if (startCall) {
  561. [strongSelf startCallWithWriteable:writeable];
  562. }
  563. }
  564. }];
  565. } else {
  566. [self startCallWithWriteable:writeable];
  567. }
  568. }
  569. - (void)setState:(GRXWriterState)newState {
  570. @synchronized(self) {
  571. // Manual transitions are only allowed from the started or paused states.
  572. if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
  573. return;
  574. }
  575. switch (newState) {
  576. case GRXWriterStateFinished:
  577. _state = newState;
  578. // Per GRXWriter's contract, setting the state to Finished manually
  579. // means one doesn't wish the writeable to be messaged anymore.
  580. [_responseWriteable cancelSilently];
  581. _responseWriteable = nil;
  582. return;
  583. case GRXWriterStatePaused:
  584. _state = newState;
  585. return;
  586. case GRXWriterStateStarted:
  587. if (_state == GRXWriterStatePaused) {
  588. _state = newState;
  589. [self maybeStartNextRead];
  590. }
  591. return;
  592. case GRXWriterStateNotStarted:
  593. return;
  594. }
  595. }
  596. }
  597. @end