Kaynağa Gözat

Revert "Report every health state change"

This reverts commit b2d8509f78c9113980380205cb6c62553ab4875f.
Lidi Zheng 5 yıl önce
ebeveyn
işleme
a729bd1828

+ 28 - 34
src/python/grpcio_health_checking/grpc_health/v1/_async.py

@@ -15,7 +15,6 @@
 
 import asyncio
 import collections
-from typing import Mapping, AbstractSet
 
 import grpc
 
@@ -25,14 +24,10 @@ from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc
 
 class AsyncHealthServicer(_health_pb2_grpc.HealthServicer):
     """An AsyncIO implementation of health checking servicer."""
-    _server_status: Mapping[str,
-                            '_health_pb2.HealthCheckResponse.ServingStatus']
-    _server_watchers: Mapping[str, AbstractSet[asyncio.Queue]]
-    _gracefully_shutting_down: bool
 
     def __init__(self):
         self._server_status = dict()
-        self._server_watchers = collections.defaultdict(set)
+        self._server_watchers = collections.defaultdict(asyncio.Condition)
         self._gracefully_shutting_down = False
 
     async def Check(self, request: _health_pb2.HealthCheckRequest, context):
@@ -44,37 +39,35 @@ class AsyncHealthServicer(_health_pb2_grpc.HealthServicer):
             return _health_pb2.HealthCheckResponse(status=status)
 
     async def Watch(self, request: _health_pb2.HealthCheckRequest, context):
-        queue = asyncio.Queue()
-        self._server_watchers[request.service].add(queue)
-
+        condition = self._server_watchers[request.service]
         try:
-            status = self._server_status.get(
-                request.service,
-                _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN)
-            while True:
-                # Responds with current health state
-                await context.write(
-                    _health_pb2.HealthCheckResponse(status=status))
-
-                # Polling on health state changes
-                status = await queue.get()
+            async with condition:
+                while True:
+                    status = self._server_status.get(
+                        request.service,
+                        _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN)
+
+                    # Responds with current health state
+                    await context.write(
+                        _health_pb2.HealthCheckResponse(status=status))
+
+                    # Polling on health state changes
+                    await condition.wait()
         finally:
-            self._server_watchers[request.service].remove(queue)
-            if not self._server_watchers[request.service]:
-                del self._server_watchers[request.service]
-
-    def _set(self, service: str,
-             status: _health_pb2.HealthCheckResponse.ServingStatus):
-        self._server_status[service] = status
+            del self._server_watchers[request.service]
 
+    async def _set(self, service: str,
+                   status: _health_pb2.HealthCheckResponse.ServingStatus):
         if service in self._server_watchers:
-            # Only iterate through the watchers if there is at least one.
-            # Otherwise, it creates empty sets.
-            for watcher in self._server_watchers[service]:
-                watcher.put_nowait(status)
+            condition = self._server_watchers.get(service)
+            async with condition:
+                self._server_status[service] = status
+                condition.notify_all()
+        else:
+            self._server_status[service] = status
 
-    def set(self, service: str,
-            status: _health_pb2.HealthCheckResponse.ServingStatus):
+    async def set(self, service: str,
+                  status: _health_pb2.HealthCheckResponse.ServingStatus):
         """Sets the status of a service.
 
         Args:
@@ -85,7 +78,7 @@ class AsyncHealthServicer(_health_pb2_grpc.HealthServicer):
         if self._gracefully_shutting_down:
             return
         else:
-            self._set(service, status)
+            await self._set(service, status)
 
     async def enter_graceful_shutdown(self):
         """Permanently sets the status of all services to NOT_SERVING.
@@ -101,4 +94,5 @@ class AsyncHealthServicer(_health_pb2_grpc.HealthServicer):
         else:
             self._gracefully_shutting_down = True
             for service in self._server_status:
-                self._set(service, _health_pb2.HealthCheckResponse.NOT_SERVING)
+                await self._set(service,
+                                _health_pb2.HealthCheckResponse.NOT_SERVING)

+ 18 - 18
src/python/grpcio_tests/tests_aio/health_check/health_servicer_test.py

@@ -44,13 +44,13 @@ class HealthServicerTest(AioTestBase):
 
     async def setUp(self):
         self._servicer = health.AsyncHealthServicer()
-        self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
-        self._servicer.set(_SERVING_SERVICE,
-                           health_pb2.HealthCheckResponse.SERVING)
-        self._servicer.set(_UNKNOWN_SERVICE,
-                           health_pb2.HealthCheckResponse.UNKNOWN)
-        self._servicer.set(_NOT_SERVING_SERVICE,
-                           health_pb2.HealthCheckResponse.NOT_SERVING)
+        await self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
+        await self._servicer.set(_SERVING_SERVICE,
+                                 health_pb2.HealthCheckResponse.SERVING)
+        await self._servicer.set(_UNKNOWN_SERVICE,
+                                 health_pb2.HealthCheckResponse.UNKNOWN)
+        await self._servicer.set(_NOT_SERVING_SERVICE,
+                                 health_pb2.HealthCheckResponse.NOT_SERVING)
         self._server = aio.server()
         port = self._server.add_insecure_port('[::]:0')
         health_pb2_grpc.add_HealthServicer_to_server(self._servicer,
@@ -118,13 +118,13 @@ class HealthServicerTest(AioTestBase):
         self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
                          (await queue.get()).status)
 
-        self._servicer.set(_WATCH_SERVICE,
-                           health_pb2.HealthCheckResponse.SERVING)
+        await self._servicer.set(_WATCH_SERVICE,
+                                 health_pb2.HealthCheckResponse.SERVING)
         self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
                          (await queue.get()).status)
 
-        self._servicer.set(_WATCH_SERVICE,
-                           health_pb2.HealthCheckResponse.NOT_SERVING)
+        await self._servicer.set(_WATCH_SERVICE,
+                                 health_pb2.HealthCheckResponse.NOT_SERVING)
         self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
                          (await queue.get()).status)
 
@@ -141,8 +141,8 @@ class HealthServicerTest(AioTestBase):
         self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
                          (await queue.get()).status)
 
-        self._servicer.set('some-other-service',
-                           health_pb2.HealthCheckResponse.SERVING)
+        await self._servicer.set('some-other-service',
+                                 health_pb2.HealthCheckResponse.SERVING)
         # The change of health status in other service should be isolated.
         # Hence, no additional notification should be observed.
         with self.assertRaises(asyncio.TimeoutError):
@@ -166,8 +166,8 @@ class HealthServicerTest(AioTestBase):
         self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
                          (await queue2.get()).status)
 
-        self._servicer.set(_WATCH_SERVICE,
-                           health_pb2.HealthCheckResponse.SERVING)
+        await self._servicer.set(_WATCH_SERVICE,
+                                 health_pb2.HealthCheckResponse.SERVING)
         self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
                          (await queue1.get()).status)
         self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
@@ -190,8 +190,8 @@ class HealthServicerTest(AioTestBase):
                          (await queue.get()).status)
 
         call.cancel()
-        self._servicer.set(_WATCH_SERVICE,
-                           health_pb2.HealthCheckResponse.SERVING)
+        await self._servicer.set(_WATCH_SERVICE,
+                                 health_pb2.HealthCheckResponse.SERVING)
         await task
 
         # Wait for the serving coroutine to process client cancellation.
@@ -216,7 +216,7 @@ class HealthServicerTest(AioTestBase):
                          (await queue.get()).status)
 
         # This should be a no-op.
-        self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
+        await self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
 
         resp = await self._stub.Check(request)
         self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,