Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catch error in sdk when workflow instance not found #771

Merged
merged 6 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/demo_workflow/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,17 @@ def main():

# Pause Test
d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component)
sleep(3)

get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
print(f'Get response from {workflow_name} after pause call: {get_response.runtime_status}')

# Resume Test
d.resume_workflow(instance_id=instance_id, workflow_component=workflow_component)
sleep(3)

get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
Expand Down
1 change: 1 addition & 0 deletions examples/workflow/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def send_alert(ctx, message: str):
status = wf_client.get_workflow_state(job_id)
except Exception:
pass

if not status or status.runtime_status.name != 'RUNNING':
# TODO update to use reuse_id_policy
instance_id = wf_client.schedule_new_workflow(
Expand Down
14 changes: 12 additions & 2 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dapr.ext.workflow.workflow_state import WorkflowState
from dapr.ext.workflow.workflow_context import Workflow
from dapr.ext.workflow.util import getAddress
from grpc import RpcError

from dapr.clients import DaprInternalError
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
Expand Down Expand Up @@ -130,8 +131,17 @@ def get_workflow_state(
exist.

"""
state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads)
return WorkflowState(state) if state else None
try:
state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads)
return WorkflowState(state) if state else None
except RpcError as error:
if 'no such instance exists' in error.details():
self._logger.warning(f'Workflow instance not found: {instance_id}')
return None
self._logger.error(
f'Unhandled RPC error while fetching workflow state: {error.code()} - {error.details()}'
)
raise

def wait_for_workflow_start(
self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 60
Expand Down
38 changes: 37 additions & 1 deletion ext/dapr-ext-workflow/tests/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
from durabletask import client
import durabletask.internal.orchestrator_service_pb2 as pb
from grpc import RpcError

mock_schedule_result = 'workflow001'
mock_raise_event_result = 'event001'
Expand All @@ -29,6 +30,19 @@
mock_resume_result = 'resume001'
mock_purge_result = 'purge001'
mock_instance_id = 'instance001'
wf_status = 'not-found'


class SimulatedRpcError(RpcError):
def __init__(self, code, details):
self._code = code
self._details = details

def code(self):
return self._code

def details(self):
return self._details


class FakeTaskHubGrpcClient:
Expand All @@ -43,7 +57,15 @@ def schedule_new_orchestration(
return mock_schedule_result

def get_orchestration_state(self, instance_id, fetch_payloads):
return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.PENDING)
global wf_status
if wf_status == 'not-found':
raise SimulatedRpcError(code='UNKNOWN', details='no such instance exists')
elif wf_status == 'found':
return self._inner_get_orchestration_state(
instance_id, client.OrchestrationStatus.PENDING
)
else:
raise SimulatedRpcError(code='UNKNOWN', details='unknown error')

def wait_for_orchestration_start(self, instance_id, fetch_payloads, timeout):
return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.RUNNING)
Expand Down Expand Up @@ -100,6 +122,20 @@ def test_client_functions(self):
)
assert actual_schedule_result == mock_schedule_result

global wf_status
wf_status = 'not-found'
actual_get_result = wfClient.get_workflow_state(
instance_id=mock_instance_id, fetch_payloads=True
)
assert actual_get_result is None

wf_status = 'error'
with self.assertRaises(RpcError):
wfClient.get_workflow_state(instance_id=mock_instance_id, fetch_payloads=True)

assert actual_get_result is None

wf_status = 'found'
actual_get_result = wfClient.get_workflow_state(
instance_id=mock_instance_id, fetch_payloads=True
)
Expand Down