AsyncCallServer.cs 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. #region Copyright notice and license
  2. // Copyright 2015 gRPC authors.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. #endregion
  16. using System;
  17. using System.Diagnostics;
  18. using System.IO;
  19. using System.Runtime.CompilerServices;
  20. using System.Runtime.InteropServices;
  21. using System.Threading;
  22. using System.Threading.Tasks;
  23. using Grpc.Core.Internal;
  24. using Grpc.Core.Utils;
  25. namespace Grpc.Core.Internal
  26. {
  27. /// <summary>
  28. /// Manages server side native call lifecycle.
  29. /// </summary>
  30. internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>, IReceivedCloseOnServerCallback, ISendStatusFromServerCompletionCallback
  31. {
  32. readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
  33. readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
  34. readonly Server server;
  35. public AsyncCallServer(Action<TResponse, SerializationContext> serializer, Func<DeserializationContext, TRequest> deserializer, Server server) : base(serializer, deserializer)
  36. {
  37. this.server = GrpcPreconditions.CheckNotNull(server);
  38. }
  39. public void Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue)
  40. {
  41. call.Initialize(completionQueue);
  42. server.AddCallReference(this);
  43. InitializeInternal(call);
  44. }
  45. /// <summary>
  46. /// Only for testing purposes.
  47. /// </summary>
  48. public void InitializeForTesting(INativeCall call)
  49. {
  50. server.AddCallReference(this);
  51. InitializeInternal(call);
  52. }
  53. /// <summary>
  54. /// Starts a server side call.
  55. /// </summary>
  56. public Task ServerSideCallAsync()
  57. {
  58. lock (myLock)
  59. {
  60. GrpcPreconditions.CheckNotNull(call);
  61. started = true;
  62. call.StartServerSide(ReceiveCloseOnServerCallback);
  63. return finishedServersideTcs.Task;
  64. }
  65. }
  66. /// <summary>
  67. /// Sends a streaming response. Only one pending send action is allowed at any given time.
  68. /// </summary>
  69. public Task SendMessageAsync(TResponse msg, WriteFlags writeFlags)
  70. {
  71. return SendMessageInternalAsync(msg, writeFlags);
  72. }
  73. /// <summary>
  74. /// Receives a streaming request. Only one pending read action is allowed at any given time.
  75. /// </summary>
  76. public Task<TRequest> ReadMessageAsync()
  77. {
  78. return ReadMessageInternalAsync();
  79. }
  80. /// <summary>
  81. /// Initiates sending a initial metadata.
  82. /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
  83. /// to make things simpler.
  84. /// </summary>
  85. public Task SendInitialMetadataAsync(Metadata headers)
  86. {
  87. lock (myLock)
  88. {
  89. GrpcPreconditions.CheckNotNull(headers, "metadata");
  90. GrpcPreconditions.CheckState(started);
  91. GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
  92. GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
  93. var earlyResult = CheckSendAllowedOrEarlyResult();
  94. if (earlyResult != null)
  95. {
  96. return earlyResult;
  97. }
  98. using (var metadataArray = MetadataArraySafeHandle.Create(headers))
  99. {
  100. call.StartSendInitialMetadata(SendCompletionCallback, metadataArray);
  101. }
  102. this.initialMetadataSent = true;
  103. streamingWriteTcs = new TaskCompletionSource<object>();
  104. return streamingWriteTcs.Task;
  105. }
  106. }
  107. /// <summary>
  108. /// Sends call result status, indicating we are done with writes.
  109. /// Sending a status different from StatusCode.OK will also implicitly cancel the call.
  110. /// </summary>
  111. public Task SendStatusFromServerAsync(Status status, Metadata trailers, ResponseWithFlags? optionalWrite)
  112. {
  113. using (var serializationScope = DefaultSerializationContext.GetInitializedThreadLocalScope())
  114. {
  115. var payload = optionalWrite.HasValue ? UnsafeSerialize(optionalWrite.Value.Response, serializationScope.Context) : SliceBufferSafeHandle.NullInstance;
  116. var writeFlags = optionalWrite.HasValue ? optionalWrite.Value.WriteFlags : default(WriteFlags);
  117. lock (myLock)
  118. {
  119. GrpcPreconditions.CheckState(started);
  120. GrpcPreconditions.CheckState(!disposed);
  121. GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once.");
  122. using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
  123. {
  124. call.StartSendStatusFromServer(SendStatusFromServerCompletionCallback, status, metadataArray, !initialMetadataSent,
  125. payload, writeFlags);
  126. }
  127. halfcloseRequested = true;
  128. initialMetadataSent = true;
  129. sendStatusFromServerTcs = new TaskCompletionSource<object>();
  130. if (optionalWrite.HasValue)
  131. {
  132. streamingWritesCounter++;
  133. }
  134. return sendStatusFromServerTcs.Task;
  135. }
  136. }
  137. }
  138. /// <summary>
  139. /// Gets cancellation token that gets cancelled once close completion
  140. /// is received and the cancelled flag is set.
  141. /// </summary>
  142. public CancellationToken CancellationToken
  143. {
  144. get
  145. {
  146. return cancellationTokenSource.Token;
  147. }
  148. }
  149. public string Peer
  150. {
  151. get
  152. {
  153. return call.GetPeer();
  154. }
  155. }
  156. protected override bool IsClient
  157. {
  158. get { return false; }
  159. }
  160. protected override Exception GetRpcExceptionClientOnly()
  161. {
  162. throw new InvalidOperationException("Call be only called for client calls");
  163. }
  164. protected override void OnAfterReleaseResourcesLocked()
  165. {
  166. server.RemoveCallReference(this);
  167. }
  168. protected override Task CheckSendAllowedOrEarlyResult()
  169. {
  170. GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed.");
  171. GrpcPreconditions.CheckState(!finished, "Already finished.");
  172. GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
  173. GrpcPreconditions.CheckState(!disposed);
  174. return null;
  175. }
  176. /// <summary>
  177. /// Handles the server side close completion.
  178. /// </summary>
  179. private void HandleFinishedServerside(bool success, bool cancelled)
  180. {
  181. // NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER,
  182. // success will be always set to true.
  183. bool releasedResources;
  184. lock (myLock)
  185. {
  186. finished = true;
  187. if (streamingReadTcs == null)
  188. {
  189. // if there's no pending read, readingDone=true will dispose now.
  190. // if there is a pending read, we will dispose once that read finishes.
  191. readingDone = true;
  192. streamingReadTcs = new TaskCompletionSource<TRequest>();
  193. streamingReadTcs.SetResult(default(TRequest));
  194. }
  195. releasedResources = ReleaseResourcesIfPossible();
  196. }
  197. if (releasedResources)
  198. {
  199. OnAfterReleaseResourcesUnlocked();
  200. }
  201. if (cancelled)
  202. {
  203. cancellationTokenSource.Cancel();
  204. }
  205. finishedServersideTcs.SetResult(null);
  206. }
  207. IReceivedCloseOnServerCallback ReceiveCloseOnServerCallback => this;
  208. void IReceivedCloseOnServerCallback.OnReceivedCloseOnServer(bool success, bool cancelled)
  209. {
  210. HandleFinishedServerside(success, cancelled);
  211. }
  212. ISendStatusFromServerCompletionCallback SendStatusFromServerCompletionCallback => this;
  213. void ISendStatusFromServerCompletionCallback.OnSendStatusFromServerCompletion(bool success)
  214. {
  215. HandleSendStatusFromServerFinished(success);
  216. }
  217. public struct ResponseWithFlags
  218. {
  219. public ResponseWithFlags(TResponse response, WriteFlags writeFlags)
  220. {
  221. this.Response = response;
  222. this.WriteFlags = writeFlags;
  223. }
  224. public TResponse Response { get; }
  225. public WriteFlags WriteFlags { get; }
  226. }
  227. }
  228. }