health.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. # Copyright 2015 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Reference implementation for health checking in gRPC Python."""
  15. import collections
  16. import threading
  17. import grpc
  18. from grpc_health.v1 import health_pb2 as _health_pb2
  19. from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc
  20. SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name['Health'].full_name
  21. class _Watcher():
  22. def __init__(self):
  23. self._condition = threading.Condition()
  24. self._responses = collections.deque()
  25. self._open = True
  26. def __iter__(self):
  27. return self
  28. def _next(self):
  29. with self._condition:
  30. while not self._responses and self._open:
  31. self._condition.wait()
  32. if self._responses:
  33. return self._responses.popleft()
  34. else:
  35. raise StopIteration()
  36. def next(self):
  37. return self._next()
  38. def __next__(self):
  39. return self._next()
  40. def add(self, response):
  41. with self._condition:
  42. self._responses.append(response)
  43. self._condition.notify()
  44. def close(self):
  45. with self._condition:
  46. self._open = False
  47. self._condition.notify()
  48. def _watcher_to_send_response_callback_adapter(watcher):
  49. def send_response_callback(response):
  50. if response is None:
  51. watcher.close()
  52. else:
  53. watcher.add(response)
  54. return send_response_callback
  55. class HealthServicer(_health_pb2_grpc.HealthServicer):
  56. """Servicer handling RPCs for service statuses."""
  57. def __init__(self,
  58. experimental_non_blocking=True,
  59. experimental_thread_pool=None):
  60. self._lock = threading.RLock()
  61. self._server_status = {}
  62. self._send_response_callbacks = {}
  63. self.Watch.__func__.experimental_non_blocking = experimental_non_blocking
  64. self.Watch.__func__.experimental_thread_pool = experimental_thread_pool
  65. self._gracefully_shutting_down = False
  66. def _on_close_callback(self, send_response_callback, service):
  67. def callback():
  68. with self._lock:
  69. self._send_response_callbacks[service].remove(
  70. send_response_callback)
  71. send_response_callback(None)
  72. return callback
  73. def Check(self, request, context):
  74. with self._lock:
  75. status = self._server_status.get(request.service)
  76. if status is None:
  77. context.set_code(grpc.StatusCode.NOT_FOUND)
  78. return _health_pb2.HealthCheckResponse()
  79. else:
  80. return _health_pb2.HealthCheckResponse(status=status)
  81. # pylint: disable=arguments-differ
  82. def Watch(self, request, context, send_response_callback=None):
  83. blocking_watcher = None
  84. if send_response_callback is None:
  85. # The server does not support the experimental_non_blocking
  86. # parameter. For backwards compatibility, return a blocking response
  87. # generator.
  88. blocking_watcher = _Watcher()
  89. send_response_callback = _watcher_to_send_response_callback_adapter(
  90. blocking_watcher)
  91. service = request.service
  92. with self._lock:
  93. status = self._server_status.get(service)
  94. if status is None:
  95. status = _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN # pylint: disable=no-member
  96. send_response_callback(
  97. _health_pb2.HealthCheckResponse(status=status))
  98. if service not in self._send_response_callbacks:
  99. self._send_response_callbacks[service] = set()
  100. self._send_response_callbacks[service].add(send_response_callback)
  101. context.add_callback(
  102. self._on_close_callback(send_response_callback, service))
  103. return blocking_watcher
  104. def set(self, service, status):
  105. """Sets the status of a service.
  106. Args:
  107. service: string, the name of the service. NOTE, '' must be set.
  108. status: HealthCheckResponse.status enum value indicating the status of
  109. the service
  110. """
  111. with self._lock:
  112. if self._gracefully_shutting_down:
  113. return
  114. else:
  115. self._server_status[service] = status
  116. if service in self._send_response_callbacks:
  117. for send_response_callback in self._send_response_callbacks[
  118. service]:
  119. send_response_callback(
  120. _health_pb2.HealthCheckResponse(status=status))
  121. def enter_graceful_shutdown(self):
  122. """Permanently sets the status of all services to NOT_SERVING.
  123. This should be invoked when the server is entering a graceful shutdown
  124. period. After this method is invoked, future attempts to set the status
  125. of a service will be ignored.
  126. This is an EXPERIMENTAL API.
  127. """
  128. with self._lock:
  129. if self._gracefully_shutting_down:
  130. return
  131. else:
  132. for service in self._server_status:
  133. self.set(service,
  134. _health_pb2.HealthCheckResponse.NOT_SERVING) # pylint: disable=no-member
  135. self._gracefully_shutting_down = True