From 2bdcb3df563398ea19c757f1db3d4917d84652c6 Mon Sep 17 00:00:00 2001 From: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com> Date: Thu, 23 Nov 2023 16:32:21 +0530 Subject: [PATCH] add an outbound health check Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com> --- dapr/aio/clients/__init__.py | 14 +++++ dapr/clients/__init__.py | 26 +++++++++ dapr/clients/http/helpers.py | 81 +++++++++++++++++++++++++++++ setup.cfg | 4 +- tests/clients/test_health_client.py | 45 ++++++++++++++++ 5 files changed, 169 insertions(+), 1 deletion(-) create mode 100644 dapr/clients/http/helpers.py create mode 100644 tests/clients/test_health_client.py diff --git a/dapr/aio/clients/__init__.py b/dapr/aio/clients/__init__.py index db932b20..f096b8f9 100644 --- a/dapr/aio/clients/__init__.py +++ b/dapr/aio/clients/__init__.py @@ -20,6 +20,7 @@ from dapr.aio.clients.grpc.client import DaprGrpcClientAsync, MetadataTuple, InvokeMethodResponse from dapr.clients.http.dapr_actor_http_client import DaprActorHttpClient from dapr.clients.http.dapr_invocation_http_client import DaprInvocationHttpClient +from dapr.clients.http.helpers import DaprHealthClient from dapr.conf import settings from google.protobuf.message import Message as GrpcMessage @@ -71,6 +72,7 @@ def __init__( """ super().__init__(address, interceptors, max_grpc_message_length) self.invocation_client = None + self.health_client = DaprHealthClient(timeout=http_timeout_seconds) invocation_protocol = settings.DAPR_API_METHOD_INVOCATION_PROTOCOL.upper() @@ -131,3 +133,15 @@ async def invoke_method( http_querystring=http_querystring, timeout=timeout ) + + async def wait(self, timeout_s: int): + """Wait for the client to become ready. If the client is already ready, this + method returns immediately. + + Args: + timeout_s (float): The maximum time to wait in seconds. + + Throws: + DaprInternalError: if the timeout expires. + """ + await self.health_client.wait_async(timeout_s) diff --git a/dapr/clients/__init__.py b/dapr/clients/__init__.py index da5f373b..f3d8d772 100644 --- a/dapr/clients/__init__.py +++ b/dapr/clients/__init__.py @@ -19,6 +19,7 @@ from dapr.clients.base import DaprActorClientBase from dapr.clients.exceptions import DaprInternalError, ERROR_CODE_UNKNOWN from dapr.clients.grpc.client import DaprGrpcClient, MetadataTuple, InvokeMethodResponse +from dapr.clients.http.helpers import DaprHealthClient from dapr.clients.http.dapr_actor_http_client import DaprActorHttpClient from dapr.clients.http.dapr_invocation_http_client import DaprInvocationHttpClient from dapr.conf import settings @@ -72,6 +73,7 @@ def __init__( """ super().__init__(address, interceptors, max_grpc_message_length) self.invocation_client = None + self.helath_client = DaprHealthClient(timeout=http_timeout_seconds) invocation_protocol = settings.DAPR_API_METHOD_INVOCATION_PROTOCOL.upper() @@ -173,3 +175,27 @@ async def invoke_method_async( else: raise NotImplementedError( 'Please use `dapr.aio.clients.DaprClient` for async invocation') + + def wait(self, timeout_s: float): + """Wait for the client to become ready. If the client is already ready, this + method returns immediately. + + Args: + timeout_s (float): The maximum time to wait in seconds. + + Throws: + DaprInternalError: if the timeout expires. + """ + self.helath_client.wait(int(timeout_s)) + + async def wait_async(self, timeout_s: float): + """Wait for the client to become ready. If the client is already ready, this + method returns immediately. + + Args: + timeout_s (float): The maximum time to wait in seconds. + + Throws: + DaprInternalError: if the timeout expires. + """ + await self.helath_client.wait_async(int(timeout_s)) diff --git a/dapr/clients/http/helpers.py b/dapr/clients/http/helpers.py new file mode 100644 index 00000000..f222a432 --- /dev/null +++ b/dapr/clients/http/helpers.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import asyncio +import time + +from typing import Optional + +from dapr.clients.http.client import DaprHttpClient, USER_AGENT_HEADER, DAPR_USER_AGENT +from dapr.serializers import DefaultJSONSerializer + + +class DaprHealthClient: + """Dapr Health Client""" + + def __init__(self, timeout: Optional[int] = 60): + self._client = DaprHttpClient(DefaultJSONSerializer(), timeout, None, None) + + async def wait_async(self, timeout_s: int): + """Wait for the client to become ready. If the client is already ready, this + method returns immediately. + + Args: + timeout_s (float): The maximum time to wait in seconds. + + Throws: + DaprInternalError: if the timeout expires. + """ + async def make_request() -> bool: + _, r = await self._client.send_bytes( + method='GET', + headers={USER_AGENT_HEADER: DAPR_USER_AGENT}, + url=f'{self._client.get_api_url()}/healthz/outbound', + data=None, + query_params=None, + timeout=timeout_s) + + return r.status >= 200 and r.status < 300 + + start = time.time() + while True: + try: + healthy = await make_request() + if healthy: + return + except Exception as e: + remaining = (start + timeout_s) - time.time() + if remaining < 0: + raise e # This will be DaprInternalError as defined in http/client.py + time.sleep(min(1, remaining)) + + def wait(self, timeout_s: int): + """Wait for the client to become ready. If the client is already ready, this + method returns immediately. + + Args: + timeout_s (float): The maximum time to wait in seconds. + + Throws: + DaprInternalError: if the timeout expires. + """ + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + awaitable = self.wait_async(timeout_s) + loop.run_until_complete(awaitable) diff --git a/setup.cfg b/setup.cfg index 962e7018..2bbd9a1d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -53,7 +53,9 @@ dapr.serializers = py.typed [flake8] -exclude = +exclude = + .venv, + .env, venv, build, dist, diff --git a/tests/clients/test_health_client.py b/tests/clients/test_health_client.py new file mode 100644 index 00000000..580793ea --- /dev/null +++ b/tests/clients/test_health_client.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import unittest + +from .fake_http_server import FakeHttpServer + +from dapr.clients.http.helpers import DaprHealthClient +from dapr.conf import settings + + +class DaprHealthClientTests(unittest.TestCase): + + def setUp(self): + self.server = FakeHttpServer() + self.server_port = self.server.get_port() + self.server.start() + settings.DAPR_HTTP_PORT = self.server_port + settings.DAPR_API_METHOD_INVOCATION_PROTOCOL = 'http' + self.client = DaprHealthClient() + self.app_id = 'fakeapp' + + def tearDown(self): + self.server.shutdown_server() + settings.DAPR_API_TOKEN = None + settings.DAPR_API_METHOD_INVOCATION_PROTOCOL = 'http' + + def test_wait_ok(self): + self.client.wait(1) + + def test_wait_timeout(self): + self.server.shutdown_server() + with self.assertRaises(Exception): + self.client.wait(1)