AsyncCallBase.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. #region Copyright notice and license
  2. // Copyright 2015, Google Inc.
  3. // All rights reserved.
  4. //
  5. // Redistribution and use in source and binary forms, with or without
  6. // modification, are permitted provided that the following conditions are
  7. // met:
  8. //
  9. // * Redistributions of source code must retain the above copyright
  10. // notice, this list of conditions and the following disclaimer.
  11. // * Redistributions in binary form must reproduce the above
  12. // copyright notice, this list of conditions and the following disclaimer
  13. // in the documentation and/or other materials provided with the
  14. // distribution.
  15. // * Neither the name of Google Inc. nor the names of its
  16. // contributors may be used to endorse or promote products derived from
  17. // this software without specific prior written permission.
  18. //
  19. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  20. // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  21. // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  22. // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  23. // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  24. // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  25. // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  26. // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  27. // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  28. // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  29. // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  30. #endregion
  31. using System;
  32. using System.Diagnostics;
  33. using System.Runtime.CompilerServices;
  34. using System.Runtime.InteropServices;
  35. using System.Threading;
  36. using System.Threading.Tasks;
  37. using Grpc.Core.Internal;
  38. using Grpc.Core.Logging;
  39. using Grpc.Core.Utils;
  40. namespace Grpc.Core.Internal
  41. {
  42. /// <summary>
  43. /// Base for handling both client side and server side calls.
  44. /// Manages native call lifecycle and provides convenience methods.
  45. /// </summary>
  46. internal abstract class AsyncCallBase<TWrite, TRead>
  47. {
  48. static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>();
  49. readonly Func<TWrite, byte[]> serializer;
  50. readonly Func<byte[], TRead> deserializer;
  51. protected readonly GrpcEnvironment environment;
  52. protected readonly object myLock = new object();
  53. protected INativeCall call;
  54. protected bool disposed;
  55. protected bool started;
  56. protected bool cancelRequested;
  57. protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
  58. protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null.
  59. protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
  60. protected bool halfcloseRequested; // True if send close have been initiated.
  61. protected bool finished; // True if close has been received from the peer.
  62. protected bool initialMetadataSent;
  63. protected long streamingWritesCounter; // Number of streaming send operations started so far.
  64. public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment)
  65. {
  66. this.serializer = Preconditions.CheckNotNull(serializer);
  67. this.deserializer = Preconditions.CheckNotNull(deserializer);
  68. this.environment = Preconditions.CheckNotNull(environment);
  69. }
  70. /// <summary>
  71. /// Requests cancelling the call.
  72. /// </summary>
  73. public void Cancel()
  74. {
  75. lock (myLock)
  76. {
  77. Preconditions.CheckState(started);
  78. cancelRequested = true;
  79. if (!disposed)
  80. {
  81. call.Cancel();
  82. }
  83. }
  84. }
  85. /// <summary>
  86. /// Requests cancelling the call with given status.
  87. /// </summary>
  88. public void CancelWithStatus(Status status)
  89. {
  90. lock (myLock)
  91. {
  92. Preconditions.CheckState(started);
  93. cancelRequested = true;
  94. if (!disposed)
  95. {
  96. call.CancelWithStatus(status);
  97. }
  98. }
  99. }
  100. protected void InitializeInternal(INativeCall call)
  101. {
  102. lock (myLock)
  103. {
  104. this.call = call;
  105. }
  106. }
  107. /// <summary>
  108. /// Initiates sending a message. Only one send operation can be active at a time.
  109. /// completionDelegate is invoked upon completion.
  110. /// </summary>
  111. protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
  112. {
  113. byte[] payload = UnsafeSerialize(msg);
  114. lock (myLock)
  115. {
  116. Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
  117. CheckSendingAllowed();
  118. call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
  119. sendCompletionDelegate = completionDelegate;
  120. initialMetadataSent = true;
  121. streamingWritesCounter++;
  122. }
  123. }
  124. /// <summary>
  125. /// Initiates reading a message. Only one read operation can be active at a time.
  126. /// completionDelegate is invoked upon completion.
  127. /// </summary>
  128. protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> completionDelegate)
  129. {
  130. lock (myLock)
  131. {
  132. Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
  133. CheckReadingAllowed();
  134. call.StartReceiveMessage(HandleReadFinished);
  135. readCompletionDelegate = completionDelegate;
  136. }
  137. }
  138. /// <summary>
  139. /// If there are no more pending actions and no new actions can be started, releases
  140. /// the underlying native resources.
  141. /// </summary>
  142. protected bool ReleaseResourcesIfPossible()
  143. {
  144. if (!disposed && call != null)
  145. {
  146. bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
  147. if (noMoreSendCompletions && readingDone && finished)
  148. {
  149. ReleaseResources();
  150. return true;
  151. }
  152. }
  153. return false;
  154. }
  155. private void ReleaseResources()
  156. {
  157. if (call != null)
  158. {
  159. call.Dispose();
  160. }
  161. disposed = true;
  162. OnAfterReleaseResources();
  163. }
  164. protected virtual void OnAfterReleaseResources()
  165. {
  166. }
  167. protected void CheckSendingAllowed()
  168. {
  169. Preconditions.CheckState(started);
  170. CheckNotCancelled();
  171. Preconditions.CheckState(!disposed);
  172. Preconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
  173. Preconditions.CheckState(!finished, "Already finished.");
  174. Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
  175. }
  176. protected virtual void CheckReadingAllowed()
  177. {
  178. Preconditions.CheckState(started);
  179. Preconditions.CheckState(!disposed);
  180. Preconditions.CheckState(!readingDone, "Stream has already been closed.");
  181. Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
  182. }
  183. protected void CheckNotCancelled()
  184. {
  185. if (cancelRequested)
  186. {
  187. throw new OperationCanceledException("Remote call has been cancelled.");
  188. }
  189. }
  190. protected byte[] UnsafeSerialize(TWrite msg)
  191. {
  192. return serializer(msg);
  193. }
  194. protected bool TrySerialize(TWrite msg, out byte[] payload)
  195. {
  196. try
  197. {
  198. payload = serializer(msg);
  199. return true;
  200. }
  201. catch (Exception e)
  202. {
  203. Logger.Error(e, "Exception occured while trying to serialize message");
  204. payload = null;
  205. return false;
  206. }
  207. }
  208. protected bool TryDeserialize(byte[] payload, out TRead msg)
  209. {
  210. try
  211. {
  212. msg = deserializer(payload);
  213. return true;
  214. }
  215. catch (Exception e)
  216. {
  217. Logger.Error(e, "Exception occured while trying to deserialize message.");
  218. msg = default(TRead);
  219. return false;
  220. }
  221. }
  222. protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDelegate, T value, Exception error)
  223. {
  224. try
  225. {
  226. completionDelegate(value, error);
  227. }
  228. catch (Exception e)
  229. {
  230. Logger.Error(e, "Exception occured while invoking completion delegate.");
  231. }
  232. }
  233. /// <summary>
  234. /// Handles send completion.
  235. /// </summary>
  236. protected void HandleSendFinished(bool success)
  237. {
  238. AsyncCompletionDelegate<object> origCompletionDelegate = null;
  239. lock (myLock)
  240. {
  241. origCompletionDelegate = sendCompletionDelegate;
  242. sendCompletionDelegate = null;
  243. ReleaseResourcesIfPossible();
  244. }
  245. if (!success)
  246. {
  247. FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed"));
  248. }
  249. else
  250. {
  251. FireCompletion(origCompletionDelegate, null, null);
  252. }
  253. }
  254. /// <summary>
  255. /// Handles halfclose completion.
  256. /// </summary>
  257. protected void HandleHalfclosed(bool success)
  258. {
  259. AsyncCompletionDelegate<object> origCompletionDelegate = null;
  260. lock (myLock)
  261. {
  262. origCompletionDelegate = sendCompletionDelegate;
  263. sendCompletionDelegate = null;
  264. ReleaseResourcesIfPossible();
  265. }
  266. if (!success)
  267. {
  268. FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Halfclose failed"));
  269. }
  270. else
  271. {
  272. FireCompletion(origCompletionDelegate, null, null);
  273. }
  274. }
  275. /// <summary>
  276. /// Handles streaming read completion.
  277. /// </summary>
  278. protected void HandleReadFinished(bool success, byte[] receivedMessage)
  279. {
  280. AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
  281. lock (myLock)
  282. {
  283. origCompletionDelegate = readCompletionDelegate;
  284. readCompletionDelegate = null;
  285. if (receivedMessage == null)
  286. {
  287. // This was the last read.
  288. readingDone = true;
  289. }
  290. ReleaseResourcesIfPossible();
  291. }
  292. // TODO: handle the case when error occured...
  293. if (receivedMessage != null)
  294. {
  295. // TODO: handle deserialization error
  296. TRead msg;
  297. TryDeserialize(receivedMessage, out msg);
  298. FireCompletion(origCompletionDelegate, msg, null);
  299. }
  300. else
  301. {
  302. FireCompletion(origCompletionDelegate, default(TRead), null);
  303. }
  304. }
  305. }
  306. }