_invocation.py 7.9 KB


  1. # Copyright 2017 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import threading
  16. import grpc
  17. _NOT_YET_OBSERVED = object()
  18. logging.basicConfig()
  19. _LOGGER = logging.getLogger(__name__)
  20. def _cancel(handler):
  21. return handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!')
  22. def _is_active(handler):
  23. return handler.is_active()
  24. def _time_remaining(unused_handler):
  25. raise NotImplementedError()
  26. def _add_callback(handler, callback):
  27. return handler.add_callback(callback)
  28. def _initial_metadata(handler):
  29. return handler.initial_metadata()
  30. def _trailing_metadata(handler):
  31. trailing_metadata, unused_code, unused_details = handler.termination()
  32. return trailing_metadata
  33. def _code(handler):
  34. unused_trailing_metadata, code, unused_details = handler.termination()
  35. return code
  36. def _details(handler):
  37. unused_trailing_metadata, unused_code, details = handler.termination()
  38. return details
  39. class _Call(grpc.Call):
  40. def __init__(self, handler):
  41. self._handler = handler
  42. def cancel(self):
  43. _cancel(self._handler)
  44. def is_active(self):
  45. return _is_active(self._handler)
  46. def time_remaining(self):
  47. return _time_remaining(self._handler)
  48. def add_callback(self, callback):
  49. return _add_callback(self._handler, callback)
  50. def initial_metadata(self):
  51. return _initial_metadata(self._handler)
  52. def trailing_metadata(self):
  53. return _trailing_metadata(self._handler)
  54. def code(self):
  55. return _code(self._handler)
  56. def details(self):
  57. return _details(self._handler)
  58. class _RpcErrorCall(grpc.RpcError, grpc.Call):
  59. def __init__(self, handler):
  60. self._handler = handler
  61. def cancel(self):
  62. _cancel(self._handler)
  63. def is_active(self):
  64. return _is_active(self._handler)
  65. def time_remaining(self):
  66. return _time_remaining(self._handler)
  67. def add_callback(self, callback):
  68. return _add_callback(self._handler, callback)
  69. def initial_metadata(self):
  70. return _initial_metadata(self._handler)
  71. def trailing_metadata(self):
  72. return _trailing_metadata(self._handler)
  73. def code(self):
  74. return _code(self._handler)
  75. def details(self):
  76. return _details(self._handler)
  77. def _next(handler):
  78. read = handler.take_response()
  79. if read.code is None:
  80. return read.response
  81. elif read.code is grpc.StatusCode.OK:
  82. raise StopIteration()
  83. else:
  84. raise _RpcErrorCall(handler)
  85. class _HandlerExtras(object):
  86. def __init__(self):
  87. self.condition = threading.Condition()
  88. self.unary_response = _NOT_YET_OBSERVED
  89. self.cancelled = False
  90. def _with_extras_cancel(handler, extras):
  91. with extras.condition:
  92. if handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!'):
  93. extras.cancelled = True
  94. return True
  95. else:
  96. return False
  97. def _extras_without_cancelled(extras):
  98. with extras.condition:
  99. return extras.cancelled
  100. def _running(handler):
  101. return handler.is_active()
  102. def _done(handler):
  103. return not handler.is_active()
  104. def _with_extras_unary_response(handler, extras):
  105. with extras.condition:
  106. if extras.unary_response is _NOT_YET_OBSERVED:
  107. read = handler.take_response()
  108. if read.code is None:
  109. extras.unary_response = read.response
  110. return read.response
  111. else:
  112. raise _RpcErrorCall(handler)
  113. else:
  114. return extras.unary_response
  115. def _exception(unused_handler):
  116. raise NotImplementedError('TODO!')
  117. def _traceback(unused_handler):
  118. raise NotImplementedError('TODO!')
  119. def _add_done_callback(handler, callback, future):
  120. adapted_callback = lambda: callback(future)
  121. if not handler.add_callback(adapted_callback):
  122. callback(future)
  123. class _FutureCall(grpc.Future, grpc.Call):
  124. def __init__(self, handler, extras):
  125. self._handler = handler
  126. self._extras = extras
  127. def cancel(self):
  128. return _with_extras_cancel(self._handler, self._extras)
  129. def cancelled(self):
  130. return _extras_without_cancelled(self._extras)
  131. def running(self):
  132. return _running(self._handler)
  133. def done(self):
  134. return _done(self._handler)
  135. def result(self):
  136. return _with_extras_unary_response(self._handler, self._extras)
  137. def exception(self):
  138. return _exception(self._handler)
  139. def traceback(self):
  140. return _traceback(self._handler)
  141. def add_done_callback(self, fn):
  142. _add_done_callback(self._handler, fn, self)
  143. def is_active(self):
  144. return _is_active(self._handler)
  145. def time_remaining(self):
  146. return _time_remaining(self._handler)
  147. def add_callback(self, callback):
  148. return _add_callback(self._handler, callback)
  149. def initial_metadata(self):
  150. return _initial_metadata(self._handler)
  151. def trailing_metadata(self):
  152. return _trailing_metadata(self._handler)
  153. def code(self):
  154. return _code(self._handler)
  155. def details(self):
  156. return _details(self._handler)
  157. def consume_requests(request_iterator, handler):
  158. def _consume():
  159. while True:
  160. try:
  161. request = next(request_iterator)
  162. added = handler.add_request(request)
  163. if not added:
  164. break
  165. except StopIteration:
  166. handler.close_requests()
  167. break
  168. except Exception: # pylint: disable=broad-except
  169. details = 'Exception iterating requests!'
  170. _LOGGER.exception(details)
  171. handler.cancel(grpc.StatusCode.UNKNOWN, details)
  172. consumption = threading.Thread(target=_consume)
  173. consumption.start()
  174. def blocking_unary_response(handler):
  175. read = handler.take_response()
  176. if read.code is None:
  177. unused_trailing_metadata, code, unused_details = handler.termination()
  178. if code is grpc.StatusCode.OK:
  179. return read.response
  180. else:
  181. raise _RpcErrorCall(handler)
  182. else:
  183. raise _RpcErrorCall(handler)
  184. def blocking_unary_response_with_call(handler):
  185. read = handler.take_response()
  186. if read.code is None:
  187. unused_trailing_metadata, code, unused_details = handler.termination()
  188. if code is grpc.StatusCode.OK:
  189. return read.response, _Call(handler)
  190. else:
  191. raise _RpcErrorCall(handler)
  192. else:
  193. raise _RpcErrorCall(handler)
  194. def future_call(handler):
  195. return _FutureCall(handler, _HandlerExtras())
  196. class ResponseIteratorCall(grpc.Call):
  197. def __init__(self, handler):
  198. self._handler = handler
  199. def __iter__(self):
  200. return self
  201. def __next__(self):
  202. return _next(self._handler)
  203. def next(self):
  204. return _next(self._handler)
  205. def cancel(self):
  206. _cancel(self._handler)
  207. def is_active(self):
  208. return _is_active(self._handler)
  209. def time_remaining(self):
  210. return _time_remaining(self._handler)
  211. def add_callback(self, callback):
  212. return _add_callback(self._handler, callback)
  213. def initial_metadata(self):
  214. return _initial_metadata(self._handler)
  215. def trailing_metadata(self):
  216. return _trailing_metadata(self._handler)
  217. def code(self):
  218. return _code(self._handler)
  219. def details(self):
  220. return _details(self._handler)