Skip to content

Commit

Permalink
Catch error in sdk when workflow instance not found (#771)
Browse files Browse the repository at this point in the history
* Catch error in sdk when workflow instance not found

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* fixes demo workflow example test

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Only return None for the correct error

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Adds test

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Linter

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Extends test

Signed-off-by: Elena Kolevska <elena@kolevska.com>

---------

Signed-off-by: Elena Kolevska <elena@kolevska.com>
elena-kolevska authored Jan 28, 2025
1 parent 7d05d6f commit 488189e
Showing 4 changed files with 54 additions and 3 deletions.
4 changes: 4 additions & 0 deletions examples/demo_workflow/app.py
Original file line number Diff line number Diff line change
@@ -139,13 +139,17 @@ def main():

# Pause Test
d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component)
sleep(3)

get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
print(f'Get response from {workflow_name} after pause call: {get_response.runtime_status}')

# Resume Test
d.resume_workflow(instance_id=instance_id, workflow_component=workflow_component)
sleep(3)

get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
1 change: 1 addition & 0 deletions examples/workflow/monitor.py
Original file line number Diff line number Diff line change
@@ -68,6 +68,7 @@ def send_alert(ctx, message: str):
status = wf_client.get_workflow_state(job_id)
except Exception:
pass

if not status or status.runtime_status.name != 'RUNNING':
# TODO update to use reuse_id_policy
instance_id = wf_client.schedule_new_workflow(
14 changes: 12 additions & 2 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
from dapr.ext.workflow.workflow_state import WorkflowState
from dapr.ext.workflow.workflow_context import Workflow
from dapr.ext.workflow.util import getAddress
from grpc import RpcError

from dapr.clients import DaprInternalError
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
@@ -130,8 +131,17 @@ def get_workflow_state(
exist.
"""
state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads)
return WorkflowState(state) if state else None
try:
state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads)
return WorkflowState(state) if state else None
except RpcError as error:
if 'no such instance exists' in error.details():
self._logger.warning(f'Workflow instance not found: {instance_id}')
return None
self._logger.error(
f'Unhandled RPC error while fetching workflow state: {error.code()} - {error.details()}'
)
raise

def wait_for_workflow_start(
self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 60
38 changes: 37 additions & 1 deletion ext/dapr-ext-workflow/tests/test_workflow_client.py
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
from durabletask import client
import durabletask.internal.orchestrator_service_pb2 as pb
from grpc import RpcError

mock_schedule_result = 'workflow001'
mock_raise_event_result = 'event001'
@@ -29,6 +30,19 @@
mock_resume_result = 'resume001'
mock_purge_result = 'purge001'
mock_instance_id = 'instance001'
wf_status = 'not-found'


class SimulatedRpcError(RpcError):
def __init__(self, code, details):
self._code = code
self._details = details

def code(self):
return self._code

def details(self):
return self._details


class FakeTaskHubGrpcClient:
@@ -43,7 +57,15 @@ def schedule_new_orchestration(
return mock_schedule_result

def get_orchestration_state(self, instance_id, fetch_payloads):
return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.PENDING)
global wf_status
if wf_status == 'not-found':
raise SimulatedRpcError(code='UNKNOWN', details='no such instance exists')
elif wf_status == 'found':
return self._inner_get_orchestration_state(
instance_id, client.OrchestrationStatus.PENDING
)
else:
raise SimulatedRpcError(code='UNKNOWN', details='unknown error')

def wait_for_orchestration_start(self, instance_id, fetch_payloads, timeout):
return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.RUNNING)
@@ -100,6 +122,20 @@ def test_client_functions(self):
)
assert actual_schedule_result == mock_schedule_result

global wf_status
wf_status = 'not-found'
actual_get_result = wfClient.get_workflow_state(
instance_id=mock_instance_id, fetch_payloads=True
)
assert actual_get_result is None

wf_status = 'error'
with self.assertRaises(RpcError):
wfClient.get_workflow_state(instance_id=mock_instance_id, fetch_payloads=True)

assert actual_get_result is None

wf_status = 'found'
actual_get_result = wfClient.get_workflow_state(
instance_id=mock_instance_id, fetch_payloads=True
)

0 comments on commit 488189e

Please sign in to comment.