ServerCallHandler.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. #region Copyright notice and license
  2. // Copyright 2015, Google Inc.
  3. // All rights reserved.
  4. //
  5. // Redistribution and use in source and binary forms, with or without
  6. // modification, are permitted provided that the following conditions are
  7. // met:
  8. //
  9. // * Redistributions of source code must retain the above copyright
  10. // notice, this list of conditions and the following disclaimer.
  11. // * Redistributions in binary form must reproduce the above
  12. // copyright notice, this list of conditions and the following disclaimer
  13. // in the documentation and/or other materials provided with the
  14. // distribution.
  15. // * Neither the name of Google Inc. nor the names of its
  16. // contributors may be used to endorse or promote products derived from
  17. // this software without specific prior written permission.
  18. //
  19. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  20. // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  21. // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  22. // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  23. // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  24. // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  25. // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  26. // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  27. // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  28. // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  29. // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  30. #endregion
  31. using System;
  32. using System.Linq;
  33. using System.Threading.Tasks;
  34. using Grpc.Core.Internal;
  35. using Grpc.Core.Utils;
  36. namespace Grpc.Core.Internal
  37. {
  38. internal interface IServerCallHandler
  39. {
  40. Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq);
  41. }
  42. internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
  43. {
  44. readonly Method<TRequest, TResponse> method;
  45. readonly UnaryServerMethod<TRequest, TResponse> handler;
  46. public UnaryServerCallHandler(Method<TRequest, TResponse> method, UnaryServerMethod<TRequest, TResponse> handler)
  47. {
  48. this.method = method;
  49. this.handler = handler;
  50. }
  51. public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
  52. {
  53. var asyncCall = new AsyncCallServer<TRequest, TResponse>(
  54. method.ResponseMarshaller.Serializer,
  55. method.RequestMarshaller.Deserializer);
  56. asyncCall.Initialize(call);
  57. var finishedTask = asyncCall.ServerSideCallAsync();
  58. var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
  59. var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
  60. Status status = Status.DefaultSuccess;
  61. try
  62. {
  63. var request = await requestStream.ReadNext();
  64. // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
  65. Preconditions.CheckArgument(await requestStream.ReadNext() == null);
  66. var result = await handler(request);
  67. await responseStream.Write(result);
  68. }
  69. catch (Exception e)
  70. {
  71. Console.WriteLine("Exception occured in handler: " + e);
  72. status = HandlerUtils.StatusFromException(e);
  73. }
  74. try
  75. {
  76. await responseStream.WriteStatus(status);
  77. }
  78. catch (OperationCanceledException)
  79. {
  80. // Call has been already cancelled.
  81. }
  82. await finishedTask;
  83. }
  84. }
  85. internal class ServerStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
  86. {
  87. readonly Method<TRequest, TResponse> method;
  88. readonly ServerStreamingServerMethod<TRequest, TResponse> handler;
  89. public ServerStreamingServerCallHandler(Method<TRequest, TResponse> method, ServerStreamingServerMethod<TRequest, TResponse> handler)
  90. {
  91. this.method = method;
  92. this.handler = handler;
  93. }
  94. public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
  95. {
  96. var asyncCall = new AsyncCallServer<TRequest, TResponse>(
  97. method.ResponseMarshaller.Serializer,
  98. method.RequestMarshaller.Deserializer);
  99. asyncCall.Initialize(call);
  100. var finishedTask = asyncCall.ServerSideCallAsync();
  101. var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
  102. var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
  103. Status status = Status.DefaultSuccess;
  104. try
  105. {
  106. var request = await requestStream.ReadNext();
  107. // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
  108. Preconditions.CheckArgument(await requestStream.ReadNext() == null);
  109. await handler(request, responseStream);
  110. }
  111. catch (Exception e)
  112. {
  113. Console.WriteLine("Exception occured in handler: " + e);
  114. status = HandlerUtils.StatusFromException(e);
  115. }
  116. try
  117. {
  118. await responseStream.WriteStatus(status);
  119. }
  120. catch (OperationCanceledException)
  121. {
  122. // Call has been already cancelled.
  123. }
  124. await finishedTask;
  125. }
  126. }
  127. internal class ClientStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
  128. {
  129. readonly Method<TRequest, TResponse> method;
  130. readonly ClientStreamingServerMethod<TRequest, TResponse> handler;
  131. public ClientStreamingServerCallHandler(Method<TRequest, TResponse> method, ClientStreamingServerMethod<TRequest, TResponse> handler)
  132. {
  133. this.method = method;
  134. this.handler = handler;
  135. }
  136. public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
  137. {
  138. var asyncCall = new AsyncCallServer<TRequest, TResponse>(
  139. method.ResponseMarshaller.Serializer,
  140. method.RequestMarshaller.Deserializer);
  141. asyncCall.Initialize(call);
  142. var finishedTask = asyncCall.ServerSideCallAsync();
  143. var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
  144. var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
  145. Status status = Status.DefaultSuccess;
  146. try
  147. {
  148. var result = await handler(requestStream);
  149. try
  150. {
  151. await responseStream.Write(result);
  152. }
  153. catch (OperationCanceledException)
  154. {
  155. status = Status.DefaultCancelled;
  156. }
  157. }
  158. catch (Exception e)
  159. {
  160. Console.WriteLine("Exception occured in handler: " + e);
  161. status = HandlerUtils.StatusFromException(e);
  162. }
  163. try
  164. {
  165. await responseStream.WriteStatus(status);
  166. }
  167. catch (OperationCanceledException)
  168. {
  169. // Call has been already cancelled.
  170. }
  171. await finishedTask;
  172. }
  173. }
  174. internal class DuplexStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
  175. {
  176. readonly Method<TRequest, TResponse> method;
  177. readonly DuplexStreamingServerMethod<TRequest, TResponse> handler;
  178. public DuplexStreamingServerCallHandler(Method<TRequest, TResponse> method, DuplexStreamingServerMethod<TRequest, TResponse> handler)
  179. {
  180. this.method = method;
  181. this.handler = handler;
  182. }
  183. public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
  184. {
  185. var asyncCall = new AsyncCallServer<TRequest, TResponse>(
  186. method.ResponseMarshaller.Serializer,
  187. method.RequestMarshaller.Deserializer);
  188. asyncCall.Initialize(call);
  189. var finishedTask = asyncCall.ServerSideCallAsync();
  190. var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
  191. var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
  192. Status status = Status.DefaultSuccess;
  193. try
  194. {
  195. await handler(requestStream, responseStream);
  196. }
  197. catch (Exception e)
  198. {
  199. Console.WriteLine("Exception occured in handler: " + e);
  200. status = HandlerUtils.StatusFromException(e);
  201. }
  202. try
  203. {
  204. await responseStream.WriteStatus(status);
  205. }
  206. catch (OperationCanceledException)
  207. {
  208. // Call has been already cancelled.
  209. }
  210. await finishedTask;
  211. }
  212. }
  213. internal class NoSuchMethodCallHandler : IServerCallHandler
  214. {
  215. public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
  216. {
  217. // We don't care about the payload type here.
  218. var asyncCall = new AsyncCallServer<byte[], byte[]>(
  219. (payload) => payload, (payload) => payload);
  220. asyncCall.Initialize(call);
  221. var finishedTask = asyncCall.ServerSideCallAsync();
  222. var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall);
  223. var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
  224. await responseStream.WriteStatus(new Status(StatusCode.Unimplemented, "No such method."));
  225. // TODO(jtattermusch): if we don't read what client has sent, the server call never gets disposed.
  226. await requestStream.ToList();
  227. await finishedTask;
  228. }
  229. }
  230. internal static class HandlerUtils
  231. {
  232. public static Status StatusFromException(Exception e)
  233. {
  234. // TODO(jtattermusch): what is the right status code here?
  235. return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
  236. }
  237. }
  238. }