_invocation.py 7.8 KB


  1. # Copyright 2017 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import threading
  16. import grpc
  17. _NOT_YET_OBSERVED = object()
  18. _LOGGER = logging.getLogger(__name__)
  19. def _cancel(handler):
  20. return handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!')
  21. def _is_active(handler):
  22. return handler.is_active()
  23. def _time_remaining(unused_handler):
  24. raise NotImplementedError()
  25. def _add_callback(handler, callback):
  26. return handler.add_callback(callback)
  27. def _initial_metadata(handler):
  28. return handler.initial_metadata()
  29. def _trailing_metadata(handler):
  30. trailing_metadata, unused_code, unused_details = handler.termination()
  31. return trailing_metadata
  32. def _code(handler):
  33. unused_trailing_metadata, code, unused_details = handler.termination()
  34. return code
  35. def _details(handler):
  36. unused_trailing_metadata, unused_code, details = handler.termination()
  37. return details
  38. class _Call(grpc.Call):
  39. def __init__(self, handler):
  40. self._handler = handler
  41. def cancel(self):
  42. _cancel(self._handler)
  43. def is_active(self):
  44. return _is_active(self._handler)
  45. def time_remaining(self):
  46. return _time_remaining(self._handler)
  47. def add_callback(self, callback):
  48. return _add_callback(self._handler, callback)
  49. def initial_metadata(self):
  50. return _initial_metadata(self._handler)
  51. def trailing_metadata(self):
  52. return _trailing_metadata(self._handler)
  53. def code(self):
  54. return _code(self._handler)
  55. def details(self):
  56. return _details(self._handler)
  57. class _RpcErrorCall(grpc.RpcError, grpc.Call):
  58. def __init__(self, handler):
  59. self._handler = handler
  60. def cancel(self):
  61. _cancel(self._handler)
  62. def is_active(self):
  63. return _is_active(self._handler)
  64. def time_remaining(self):
  65. return _time_remaining(self._handler)
  66. def add_callback(self, callback):
  67. return _add_callback(self._handler, callback)
  68. def initial_metadata(self):
  69. return _initial_metadata(self._handler)
  70. def trailing_metadata(self):
  71. return _trailing_metadata(self._handler)
  72. def code(self):
  73. return _code(self._handler)
  74. def details(self):
  75. return _details(self._handler)
  76. def _next(handler):
  77. read = handler.take_response()
  78. if read.code is None:
  79. return read.response
  80. elif read.code is grpc.StatusCode.OK:
  81. raise StopIteration()
  82. else:
  83. raise _RpcErrorCall(handler)
  84. class _HandlerExtras(object):
  85. def __init__(self):
  86. self.condition = threading.Condition()
  87. self.unary_response = _NOT_YET_OBSERVED
  88. self.cancelled = False
  89. def _with_extras_cancel(handler, extras):
  90. with extras.condition:
  91. if handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!'):
  92. extras.cancelled = True
  93. return True
  94. else:
  95. return False
  96. def _extras_without_cancelled(extras):
  97. with extras.condition:
  98. return extras.cancelled
  99. def _running(handler):
  100. return handler.is_active()
  101. def _done(handler):
  102. return not handler.is_active()
  103. def _with_extras_unary_response(handler, extras):
  104. with extras.condition:
  105. if extras.unary_response is _NOT_YET_OBSERVED:
  106. read = handler.take_response()
  107. if read.code is None:
  108. extras.unary_response = read.response
  109. return read.response
  110. else:
  111. raise _RpcErrorCall(handler)
  112. else:
  113. return extras.unary_response
  114. def _exception(unused_handler):
  115. raise NotImplementedError('TODO!')
  116. def _traceback(unused_handler):
  117. raise NotImplementedError('TODO!')
  118. def _add_done_callback(handler, callback, future):
  119. adapted_callback = lambda: callback(future)
  120. if not handler.add_callback(adapted_callback):
  121. callback(future)
  122. class _FutureCall(grpc.Future, grpc.Call):
  123. def __init__(self, handler, extras):
  124. self._handler = handler
  125. self._extras = extras
  126. def cancel(self):
  127. return _with_extras_cancel(self._handler, self._extras)
  128. def cancelled(self):
  129. return _extras_without_cancelled(self._extras)
  130. def running(self):
  131. return _running(self._handler)
  132. def done(self):
  133. return _done(self._handler)
  134. def result(self):
  135. return _with_extras_unary_response(self._handler, self._extras)
  136. def exception(self):
  137. return _exception(self._handler)
  138. def traceback(self):
  139. return _traceback(self._handler)
  140. def add_done_callback(self, fn):
  141. _add_done_callback(self._handler, fn, self)
  142. def is_active(self):
  143. return _is_active(self._handler)
  144. def time_remaining(self):
  145. return _time_remaining(self._handler)
  146. def add_callback(self, callback):
  147. return _add_callback(self._handler, callback)
  148. def initial_metadata(self):
  149. return _initial_metadata(self._handler)
  150. def trailing_metadata(self):
  151. return _trailing_metadata(self._handler)
  152. def code(self):
  153. return _code(self._handler)
  154. def details(self):
  155. return _details(self._handler)
  156. def consume_requests(request_iterator, handler):
  157. def _consume():
  158. while True:
  159. try:
  160. request = next(request_iterator)
  161. added = handler.add_request(request)
  162. if not added:
  163. break
  164. except StopIteration:
  165. handler.close_requests()
  166. break
  167. except Exception: # pylint: disable=broad-except
  168. details = 'Exception iterating requests!'
  169. _LOGGER.exception(details)
  170. handler.cancel(grpc.StatusCode.UNKNOWN, details)
  171. consumption = threading.Thread(target=_consume)
  172. consumption.start()
  173. def blocking_unary_response(handler):
  174. read = handler.take_response()
  175. if read.code is None:
  176. unused_trailing_metadata, code, unused_details = handler.termination()
  177. if code is grpc.StatusCode.OK:
  178. return read.response
  179. else:
  180. raise _RpcErrorCall(handler)
  181. else:
  182. raise _RpcErrorCall(handler)
  183. def blocking_unary_response_with_call(handler):
  184. read = handler.take_response()
  185. if read.code is None:
  186. unused_trailing_metadata, code, unused_details = handler.termination()
  187. if code is grpc.StatusCode.OK:
  188. return read.response, _Call(handler)
  189. else:
  190. raise _RpcErrorCall(handler)
  191. else:
  192. raise _RpcErrorCall(handler)
  193. def future_call(handler):
  194. return _FutureCall(handler, _HandlerExtras())
  195. class ResponseIteratorCall(grpc.Call):
  196. def __init__(self, handler):
  197. self._handler = handler
  198. def __iter__(self):
  199. return self
  200. def __next__(self):
  201. return _next(self._handler)
  202. def next(self):
  203. return _next(self._handler)
  204. def cancel(self):
  205. _cancel(self._handler)
  206. def is_active(self):
  207. return _is_active(self._handler)
  208. def time_remaining(self):
  209. return _time_remaining(self._handler)
  210. def add_callback(self, callback):
  211. return _add_callback(self._handler, callback)
  212. def initial_metadata(self):
  213. return _initial_metadata(self._handler)
  214. def trailing_metadata(self):
  215. return _trailing_metadata(self._handler)
  216. def code(self):
  217. return _code(self._handler)
  218. def details(self):
  219. return _details(self._handler)