Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' into add-environment-variables
Browse files Browse the repository at this point in the history
dimastbk authored Nov 6, 2024
2 parents 5e51c7a + 9563192 commit f647ecd
Showing 4 changed files with 20 additions and 10 deletions.
8 changes: 7 additions & 1 deletion pyzeebe/grpc_internals/zeebe_adapter_base.py
Original file line number Diff line number Diff line change
@@ -21,11 +21,15 @@ class ZeebeAdapterBase:
def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = -1):
self._channel = grpc_channel
self._gateway_stub = GatewayStub(grpc_channel)
self.connected = True
self._connected = True
self.retrying_connection = False
self._max_connection_retries = max_connection_retries
self._current_connection_retries = 0

@property
def connected(self) -> bool:
return self._connected

def _should_retry(self) -> bool:
return self._max_connection_retries == -1 or self._current_connection_retries < self._max_connection_retries

@@ -44,6 +48,8 @@ async def _close(self) -> None:
await self._channel.close()
except Exception as exception:
logger.exception("Failed to close channel, %s exception was raised", type(exception).__name__)
finally:
self._connected = False


def _create_pyzeebe_error_from_grpc_error(grpc_error: grpc.aio.AioRpcError) -> PyZeebeError:
11 changes: 7 additions & 4 deletions tests/unit/grpc_internals/zeebe_adapter_base_test.py
Original file line number Diff line number Diff line change
@@ -69,18 +69,21 @@ async def test_raises_unkown_grpc_status_code_on_unkown_status_code(
async def test_closes_after_retries_exceeded(self, zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(grpc.StatusCode.UNAVAILABLE, None, None)

zeebe_adapter._close = AsyncMock()
zeebe_adapter._channel.close = AsyncMock()
zeebe_adapter._max_connection_retries = 1
with pytest.raises(ZeebeGatewayUnavailableError):
await zeebe_adapter._handle_grpc_error(error)

zeebe_adapter._close.assert_called_once()
assert zeebe_adapter.connected is False
zeebe_adapter._channel.close.assert_awaited_once()

async def test_closes_after_internal_error(self, zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(grpc.StatusCode.INTERNAL, None, None)
zeebe_adapter._close = AsyncMock()

zeebe_adapter._channel.close = AsyncMock()
zeebe_adapter._max_connection_retries = 1
with pytest.raises(ZeebeInternalError):
await zeebe_adapter._handle_grpc_error(error)

zeebe_adapter._close.assert_called_once()
assert zeebe_adapter.connected is False
zeebe_adapter._channel.close.assert_awaited_once()
4 changes: 2 additions & 2 deletions tests/unit/worker/job_poller_test.py
Original file line number Diff line number Diff line change
@@ -46,13 +46,13 @@ async def test_job_is_added_to_task_state(

class TestShouldPoll:
def test_should_poll_returns_expected_result_when_disconnected(self, job_poller: JobPoller):
job_poller.zeebe_adapter.connected = False
job_poller.zeebe_adapter._connected = False
job_poller.zeebe_adapter.retrying_connection = False

assert not job_poller.should_poll()

def test_continues_polling_when_retrying_connection(self, job_poller: JobPoller):
job_poller.zeebe_adapter.connected = False
job_poller.zeebe_adapter._connected = False
job_poller.zeebe_adapter.retrying_connection = True

assert job_poller.should_poll()
7 changes: 4 additions & 3 deletions tests/unit/worker/worker_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from unittest.mock import AsyncMock, Mock
from uuid import uuid4

@@ -277,12 +278,12 @@ async def test_poller_failed(self, zeebe_worker: ZeebeWorker):
async def test_second_poller_should_cancel(self, zeebe_worker: ZeebeWorker):
zeebe_worker._init_tasks = Mock()

poller2_cancel_event = anyio.Event()
poller2_cancel_event = asyncio.Event()

async def poll2():
try:
await anyio.Event().wait()
except anyio.get_cancelled_exc_class():
await asyncio.Event().wait()
except asyncio.CancelledError:
poller2_cancel_event.set()

poller_mock = AsyncMock(spec_set=JobPoller, poll=AsyncMock(side_effect=[Exception("test_exception")]))

0 comments on commit f647ecd

Please sign in to comment.