Skip to content

Commit

Permalink
Merge pull request #409 from sartography/enhancement/allow-worfklow-t…
Browse files Browse the repository at this point in the history
…argets-from-external-events

handle target workflows in send_event
  • Loading branch information
essweine authored May 14, 2024
2 parents ac2d8f3 + 9d18aea commit 6d18cb8
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 18 deletions.
4 changes: 4 additions & 0 deletions SpiffWorkflow/bpmn/util/subworkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from SpiffWorkflow import Workflow
from SpiffWorkflow.exceptions import TaskNotFoundException
from .task import BpmnTaskIterator

class BpmnBaseWorkflow(Workflow):

Expand All @@ -31,6 +32,9 @@ def __init__(self, spec, **kwargs):
def data_objects(self):
return self.data.get('data_objects', {})

def get_tasks_iterator(self, first_task=None, **kwargs):
return BpmnTaskIterator(first_task or self.task_tree, **kwargs)


class BpmnSubWorkflow(BpmnBaseWorkflow):

Expand Down
9 changes: 7 additions & 2 deletions SpiffWorkflow/bpmn/util/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ def _catches_event(task):

class BpmnTaskIterator(TaskIterator):

def __init__(self, task, end_at_spec=None, max_depth=1000, depth_first=True, task_filter=None, **kwargs):
def __init__(self, task, end_at_spec=None, max_depth=1000, depth_first=True, skip_subpprocesses=False, task_filter=None, **kwargs):

task_filter = task_filter or BpmnTaskFilter(**kwargs)
super().__init__(task, end_at_spec, max_depth, depth_first, task_filter)
self.skip_subpprocesses = skip_subpprocesses

def _next(self):

Expand All @@ -61,7 +62,11 @@ def _next(self):
task.task_spec.name != self.end_at_spec,
]):
# Do not descend into a completed subprocess to look for unfinished tasks.
if subprocess is None or (task.state >= TaskState.FINISHED_MASK and self.task_filter.state <= TaskState.FINISHED_MASK):
if (
subprocess is None
or self.skip_subpprocesses
or (task.state >= TaskState.FINISHED_MASK and self.task_filter.state <= TaskState.FINISHED_MASK)
):
subprocess_tasks = []
else:
subprocess_tasks = [subprocess.task_tree]
Expand Down
31 changes: 15 additions & 16 deletions SpiffWorkflow/bpmn/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from SpiffWorkflow.bpmn.specs.control import BoundaryEventSplit

from SpiffWorkflow.bpmn.util.subworkflow import BpmnBaseWorkflow, BpmnSubWorkflow
from SpiffWorkflow.bpmn.util.task import BpmnTaskIterator

from .script_engine.python_engine import PythonScriptEngine

Expand Down Expand Up @@ -79,9 +78,6 @@ def parent_workflow(self):
def depth(self):
return 0

def get_tasks_iterator(self, first_task=None, **kwargs):
return BpmnTaskIterator(first_task or self.task_tree, **kwargs)

def create_subprocess(self, my_task, spec_name):
# This creates a subprocess for an existing task
subprocess = BpmnSubWorkflow(
Expand Down Expand Up @@ -114,30 +110,33 @@ def catch(self, event):
:param event: the thrown event
"""
if event.target is None:
if event.target is not None:
# This limits results to tasks in the specified workflow
tasks = event.target.get_tasks(skip_subpprocesses=True, state=TaskState.NOT_FINISHED_MASK, catches_event=event)
else:
self.update_collaboration(event)
tasks = self.get_tasks(state=TaskState.NOT_FINISHED_MASK, catches_event=event)
# Figure out if we need to create an external event
if len(tasks) == 0:
self.bpmn_events.append(event)
else:
tasks = self.get_tasks(state=TaskState.NOT_FINISHED_MASK, catches_event=event)

for task in tasks:
task.task_spec.catch(task, event)

self.refresh_waiting_tasks()
if len(tasks) > 0:
self.refresh_waiting_tasks()

def send_event(self, event):
"""Allows this workflow to catch an externally generated event."""

tasks = self.get_tasks(state=TaskState.NOT_FINISHED_MASK, catches_event=event)
if len(tasks) == 0:
raise WorkflowException(f"This process is not waiting for {event.event_definition.name}")
for task in tasks:
task.task_spec.catch(task, event)

self.refresh_waiting_tasks()
if event.target is not None:
self.catch(event)
else:
tasks = self.get_tasks(state=TaskState.NOT_FINISHED_MASK, catches_event=event)
if len(tasks) == 0:
raise WorkflowException(f"This process is not waiting for {event.event_definition.name}")
for task in tasks:
task.task_spec.catch(task, event)
self.refresh_waiting_tasks()

def get_events(self):
"""Returns the list of events that cannot be handled from within this workflow."""
Expand Down

0 comments on commit 6d18cb8

Please sign in to comment.