From 54f80ea8209dac83631d60373a12b1542f83a0f0 Mon Sep 17 00:00:00 2001 From: Elizabeth Esswein Date: Tue, 28 May 2024 13:00:06 -0400 Subject: [PATCH 1/3] improvements to iteration over subprocesses --- SpiffWorkflow/bpmn/util/subworkflow.py | 4 ++-- SpiffWorkflow/bpmn/workflow.py | 13 +++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/SpiffWorkflow/bpmn/util/subworkflow.py b/SpiffWorkflow/bpmn/util/subworkflow.py index 4e1740ef..711e053b 100644 --- a/SpiffWorkflow/bpmn/util/subworkflow.py +++ b/SpiffWorkflow/bpmn/util/subworkflow.py @@ -43,6 +43,7 @@ def __init__(self, spec, parent_task_id, top_workflow, **kwargs): self.parent_task_id = parent_task_id self.top_workflow = top_workflow self.correlations = {} + self.depth = self._calculate_depth() @property def script_engine(self): @@ -53,8 +54,7 @@ def parent_workflow(self): task = self.top_workflow.get_task_from_id(self.parent_task_id) return task.workflow - @property - def depth(self): + def _calculate_depth(self): current, depth = self, 0 while current.parent_workflow is not None: depth += 1 diff --git a/SpiffWorkflow/bpmn/workflow.py b/SpiffWorkflow/bpmn/workflow.py index a6ab8f38..f4b0ecd6 100644 --- a/SpiffWorkflow/bpmn/workflow.py +++ b/SpiffWorkflow/bpmn/workflow.py @@ -200,17 +200,18 @@ def update_task(task): did_refresh_task(task) for subprocess in sorted(self.get_active_subprocesses(), key=lambda v: v.depth, reverse=True): - for task in subprocess.get_tasks_iterator(state=TaskState.WAITING): + for task in subprocess.get_tasks_iterator(skip_subprocesses=True, state=TaskState.WAITING): update_task(task) - for task in self.get_tasks_iterator(state=TaskState.WAITING): + for task in self.get_tasks_iterator(skip_subprocesses=True, state=TaskState.WAITING): update_task(task) def get_task_from_id(self, task_id): - for subprocess in self.subprocesses.values(): - task = subprocess.get_task_from_id(task_id) - if task is not None: - return task + if task_id not in self.tasks: + for subprocess in self.subprocesses.values(): + task = subprocess.get_task_from_id(task_id) + if task is not None: + return task return super().get_task_from_id(task_id) def reset_from_task_id(self, task_id, data=None, remove_subprocess=True): From 0833800179d8b11309767f4b90bd3b7545aa8ae3 Mon Sep 17 00:00:00 2001 From: Elizabeth Esswein Date: Tue, 28 May 2024 16:11:22 -0400 Subject: [PATCH 2/3] use events in MI tasks --- .../bpmn/serializer/default/process_spec.py | 8 + .../bpmn/specs/mixins/multiinstance_task.py | 215 ++++++++---------- .../camunda/specs/multiinstance_task.py | 4 +- 3 files changed, 107 insertions(+), 120 deletions(-) diff --git a/SpiffWorkflow/bpmn/serializer/default/process_spec.py b/SpiffWorkflow/bpmn/serializer/default/process_spec.py index c0f60fec..ec80fb26 100644 --- a/SpiffWorkflow/bpmn/serializer/default/process_spec.py +++ b/SpiffWorkflow/bpmn/serializer/default/process_spec.py @@ -18,6 +18,7 @@ # 02110-1301 USA from ..helpers.bpmn_converter import BpmnConverter +from SpiffWorkflow.bpmn.specs.mixins.multiinstance_task import LoopTask class BpmnProcessSpecConverter(BpmnConverter): @@ -71,6 +72,7 @@ def from_dict(self, dct): # Add messaging related stuff spec.correlation_keys = dct.pop('correlation_keys', {}) + loop_tasks = [] dct['task_specs'].pop('Root', None) for name, task_dict in dct['task_specs'].items(): # I hate this, but I need to pass in the workflow spec when I create the task. @@ -80,6 +82,12 @@ def from_dict(self, dct): task_spec = self.registry.restore(task_dict) if name == 'Start': spec.start = task_spec + if isinstance(task_spec, LoopTask): + loop_tasks.append(task_spec) self.restore_task_spec_extensions(task_dict, task_spec) + for task_spec in loop_tasks: + child_spec = spec.task_specs.get(task_spec.task_spec) + child_spec.completed_event.connect(task_spec.merge_child) + return spec diff --git a/SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py b/SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py index e4ad3b2c..795a3bd3 100644 --- a/SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py +++ b/SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py @@ -29,25 +29,6 @@ class LoopTask(BpmnTaskSpec): - def process_children(self, my_task): - """ - Handle any newly completed children and update merged tasks. - Returns a boolean indicating whether there is a child currently running - """ - merged = self._merged_children(my_task) - child_running = False - for child in self._instances(my_task): - if child.has_state(TaskState.FINISHED_MASK) and str(child.id) not in merged: - self.child_completed_action(my_task, child) - merged.append(str(child.id)) - elif not child.has_state(TaskState.FINISHED_MASK): - child_running = True - my_task.internal_data['merged'] = merged - return child_running - - def child_completed_action(self, my_task, child): - raise NotImplementedError - def _merged_children(self, my_task): return my_task.internal_data.get('merged', []) @@ -73,40 +54,37 @@ def task_info(self, my_task): return info def _update_hook(self, my_task): - - if my_task.state != TaskState.WAITING: - super()._update_hook(my_task) - - child_running = self.process_children(my_task) - if child_running: - # We're in the middle of an iteration; we're not done and we can't create a new task - return False - elif self.loop_complete(my_task): - # No children running and one of the completion conditions has been met; done + super()._update_hook(my_task) + if self.test_before and self.loop_complete(my_task): return True else: - # Execute again - if my_task.state != TaskState.WAITING: - my_task._set_state(TaskState.WAITING) - task_spec = my_task.workflow.spec.task_specs[self.task_spec] - child = my_task._add_child(task_spec, TaskState.WAITING) - child.triggered = True - child.data = deepcopy(my_task.data) - child.internal_data['iteration'] = len(self._merged_children(my_task)) - - def child_completed_action(self, my_task, child): + my_task._set_state(TaskState.STARTED) + my_task.internal_data['merged'] = [] + self.create_child(my_task) + + def create_child(self, my_task): + task_spec = my_task.workflow.spec.task_specs[self.task_spec] + if not task_spec.completed_event.is_connected(self.merge_child): + task_spec.completed_event.connect(self.merge_child) + child = my_task._add_child(task_spec, TaskState.WAITING) + child.triggered = True + child.internal_data['iteration'] = len(self._merged_children(my_task)) + child.task_spec._update(child) + + def merge_child(self, workflow, child): + my_task = child.parent DeepMerge.merge(my_task.data, child.data) + my_task.internal_data['merged'].append(str(child.id)) + if self.loop_complete(my_task): + my_task._set_state(TaskState.READY) + else: + self.create_child(my_task) def loop_complete(self, my_task): - merged = self._merged_children(my_task) - if not self.test_before and len(merged) == 0: - # "test before" isn't really compatible our execution model in a transparent way - # This guarantees that the task will run at least once if test_before is False - return False - else: - max_complete = self.maximum is not None and len(merged) >= self.maximum - cond_complete = self.condition is not None and my_task.workflow.script_engine.evaluate(my_task, self.condition) - return max_complete or cond_complete + merged = my_task.internal_data.get('merged', []) + max_complete = self.maximum is not None and len(merged) >= self.maximum + cond_complete = self.condition is not None and my_task.workflow.script_engine.evaluate(my_task, self.condition) + return max_complete or cond_complete class MultiInstanceTask(LoopTask): @@ -145,9 +123,9 @@ def task_info(self, my_task): info['instance_map'][str(value)] = str(task.id) return info - def child_completed_action(self, my_task, child): + def merge_child(self, workflow, child): """This merges child data into this task's data.""" - + my_task = child.parent if self.data_output is not None and self.output_item is not None: if not self.output_item.exists(child): self.raise_data_exception("Expected an output item", child) @@ -161,10 +139,13 @@ def child_completed_action(self, my_task, child): data_output.append(item) else: DeepMerge.merge(my_task.data, child.data) + my_task.internal_data['merged'].append(str(child.id)) def create_child(self, my_task, item, key_or_index=None): task_spec = my_task.workflow.spec.task_specs[self.task_spec] + if not task_spec.completed_event.is_connected(self.merge_child): + task_spec.completed_event.connect(self.merge_child) child = my_task._add_child(task_spec, TaskState.WAITING) child.triggered = True if self.input_item is not None and self.input_item.exists(my_task): @@ -181,8 +162,8 @@ def create_child(self, my_task, item, key_or_index=None): def check_completion_condition(self, my_task): - merged = self._merged_children(my_task) - if len(merged) > 0: + merged = my_task.internal_data.get('merged', []) + if len(merged) > 0 and self.condition is not None: last_child = [c for c in my_task.children if str(c.id) == merged[-1]][0] return my_task.workflow.script_engine.evaluate(last_child, self.condition) @@ -225,17 +206,23 @@ def raise_data_exception(self, message, my_task): class SequentialMultiInstanceTask(MultiInstanceTask): def _update_hook(self, my_task): - - if my_task.state != TaskState.WAITING: - super()._update_hook(my_task) - - child_running = self.process_children(my_task) - if child_running: - return False - if self.condition is not None and self.check_completion_condition(my_task): - return True + super()._update_hook(my_task) + my_task.internal_data['merged'] = [] + if self.data_input is not None: + input_data = self.data_input.get(my_task) + my_task.internal_data['remaining'] = self.init_remaining_items(my_task) + if self.data_output is not None: + self.init_data_output_with_input_data(my_task, input_data) + else: + my_task.internal_data['cardinality'] = my_task.workflow.script_engine.evaluate(my_task, self.cardinality) + my_task.internal_data['current'] = 0 + if self.data_output is not None: + self.init_data_output_with_cardinality(my_task) + self.add_next_child(my_task) + if not self.children_complete(my_task): + my_task._set_state(TaskState.STARTED) else: - return self.add_next_child(my_task) + return True def task_info(self, my_task): info = super().task_info(my_task) @@ -245,29 +232,17 @@ def task_info(self, my_task): return info def add_next_child(self, my_task): - if self.data_input is not None: key_or_index, item = self.get_next_input_item(my_task) else: key_or_index, item = self.get_next_index(my_task) - if item is not None: - if my_task.state != TaskState.WAITING: - my_task._set_state(TaskState.WAITING) self.create_child(my_task, item, key_or_index) - else: - return True def get_next_input_item(self, my_task): input_data = self.data_input.get(my_task) remaining = my_task.internal_data.get('remaining') - - if remaining is None: - remaining = self.init_remaining_items(my_task) - if self.data_output is not None: - self.init_data_output_with_input_data(my_task, input_data) - if len(remaining) > 0: if isinstance(input_data, (Mapping, Sequence)): # In this case, we want to preserve a key or index @@ -280,6 +255,25 @@ def get_next_input_item(self, my_task): else: return None, None + def get_next_index(self, my_task): + + current = my_task.internal_data.get('current') + cardinality = my_task.internal_data.get('cardinality') + if current < cardinality: + # If using loop cardinality, use the index as the "item" + my_task.internal_data['current'] = current + 1 + return None, current + else: + return None, None + + def merge_child(self, workflow, child): + super().merge_child(workflow, child) + my_task = child.parent + if self.children_complete(my_task) or self.check_completion_condition(my_task): + my_task._set_state(TaskState.READY) + else: + self.add_next_child(my_task) + def init_remaining_items(self, my_task): if not self.data_input.exists(my_task): @@ -300,43 +294,34 @@ def init_remaining_items(self, my_task): self.raise_data_exception("Multiinstance data input must be iterable", my_task) return remaining - def get_next_index(self, my_task): - - current = my_task.internal_data.get('current') - if current is None: - current = 0 - if self.data_output is not None: - self.init_data_output_with_cardinality(my_task) - - cardinality = my_task.internal_data.get('cardinality') - if cardinality is None: - # In case the evaluated expression changes during execution - cardinality = my_task.workflow.script_engine.evaluate(my_task, self.cardinality) - my_task.internal_data['cardinality'] = cardinality - - if current < cardinality: - # If using loop cardinalty, if a data input was specified, use the index as the "item" - my_task.internal_data['current'] = current + 1 - return None, current + def children_complete(self, my_task): + if self.data_input is not None: + return len(my_task.internal_data.get('remaining', [])) == 0 else: - return None, None + return my_task.internal_data.get('current', 0) == my_task.internal_data.get('cardinality', 0) class ParallelMultiInstanceTask(MultiInstanceTask): def _update_hook(self, my_task): - - if my_task.state != TaskState.WAITING: - super()._update_hook(my_task) - self.create_children(my_task) - - child_running = self.process_children(my_task) - if self.condition is not None and self.check_completion_condition(my_task): - for child in my_task.children: - if child.task_spec.name == self.task_spec and child.state != TaskState.COMPLETED: - child.cancel() + super()._update_hook(my_task) + my_task.internal_data['merged'] = [] + self.create_children(my_task) + # If the input collection or cardinalty is 0, there won't be any children to cause the task to become ready + if not self.children_complete(my_task): + my_task._set_state(TaskState.STARTED) + else: return True - return not child_running + + def merge_child(self, workflow, child): + super().merge_child(workflow, child) + my_task = child.parent + if self.check_completion_condition(my_task): + for child in self._instances(my_task): + child.cancel() + my_task._set_state(TaskState.READY) + elif self.children_complete(my_task): + my_task._set_state(TaskState.READY) def create_children(self, my_task): @@ -355,18 +340,14 @@ def create_children(self, my_task): cardinality = my_task.workflow.script_engine.evaluate(my_task, self.cardinality) children = ((idx, idx) for idx in range(cardinality)) - if not my_task.internal_data.get('started', False): - - if self.data_output is not None: - if self.data_input is not None: - self.init_data_output_with_input_data(my_task, self.data_input.get(my_task)) - else: - self.init_data_output_with_cardinality(my_task) + if self.data_output is not None: + if self.data_input is not None: + self.init_data_output_with_input_data(my_task, self.data_input.get(my_task)) + else: + self.init_data_output_with_cardinality(my_task) - my_task._set_state(TaskState.WAITING) - for key_or_index, item in children: - self.create_child(my_task, item, key_or_index) + for key_or_index, item in children: + self.create_child(my_task, item, key_or_index) - my_task.internal_data['started'] = True - else: - return len(self._merged_children(my_task)) == len(children) + def children_complete(self, my_task): + return all(c.state == TaskState.COMPLETED for c in self._instances(my_task)) \ No newline at end of file diff --git a/SpiffWorkflow/camunda/specs/multiinstance_task.py b/SpiffWorkflow/camunda/specs/multiinstance_task.py index 764d12f8..364e4bd8 100644 --- a/SpiffWorkflow/camunda/specs/multiinstance_task.py +++ b/SpiffWorkflow/camunda/specs/multiinstance_task.py @@ -64,7 +64,5 @@ def _update_hook(self, my_task): class ParallelMultiInstanceTask(BpmnParallelMITask): def _update_hook(self, my_task): - if not my_task.internal_data.get('started', False): - update_task_spec(my_task) - self.create_children(my_task) + update_task_spec(my_task) return super()._update_hook(my_task) \ No newline at end of file From 450379953c89bea7a4bc2f5b8707b7338b9a51ae Mon Sep 17 00:00:00 2001 From: Elizabeth Esswein Date: Tue, 28 May 2024 23:31:23 -0400 Subject: [PATCH 3/3] add version migration --- .../bpmn/serializer/migration/version_1_4.py | 13 + .../serializer/migration/version_migration.py | 26 +- SpiffWorkflow/bpmn/serializer/workflow.py | 2 +- .../data/serialization/v1.3-mi-states.json | 378 ++++++++++++++++++ .../bpmn/serializer/VersionMigrationTest.py | 33 ++ 5 files changed, 450 insertions(+), 2 deletions(-) create mode 100644 SpiffWorkflow/bpmn/serializer/migration/version_1_4.py create mode 100644 tests/SpiffWorkflow/bpmn/data/serialization/v1.3-mi-states.json diff --git a/SpiffWorkflow/bpmn/serializer/migration/version_1_4.py b/SpiffWorkflow/bpmn/serializer/migration/version_1_4.py new file mode 100644 index 00000000..8a4cc7d1 --- /dev/null +++ b/SpiffWorkflow/bpmn/serializer/migration/version_1_4.py @@ -0,0 +1,13 @@ + +def update_mi_states(dct): + + typenames = ['StandardLoopTask', 'SequentialMultiInstanceTask', 'ParallelMultiInstanceTask'] + def update(tasks, task_specs): + for task in tasks: + task_spec = task_specs.get(task['task_spec'], {}) + if task['state'] == 8 and task_spec['typename'] in typenames: + task['state'] = 32 + + for up in dct['subprocesses'].values(): + update(sp['tasks'].values(), sp['spec']['task_specs']) + update(dct['tasks'].values(), dct['spec']['task_specs']) diff --git a/SpiffWorkflow/bpmn/serializer/migration/version_migration.py b/SpiffWorkflow/bpmn/serializer/migration/version_migration.py index cf193283..15b39a58 100644 --- a/SpiffWorkflow/bpmn/serializer/migration/version_migration.py +++ b/SpiffWorkflow/bpmn/serializer/migration/version_migration.py @@ -35,8 +35,32 @@ add_new_typenames, update_data_objects, ) +from .version_1_4 import update_mi_states + +def from_version_1_3(dct): + """Upgrade serialization from v1.3 to v1.4 + + Multiinstance tasks now rely on events rather than polling to merge children, so once + they are reached, they should be STARTED rather than WAITING. + """ + dct['VERSION'] = "1.3" + update_mi_states(dct) def from_version_1_2(dct): + """Upgrade serialization from v.1.2 to v.1.3 + + The internal/external distinction on event definitions was replaced with the ability to + target a specific workflow. + + Boundary event parent gateway tasks ave been replaced with a gateway structure. + + The creation of an unnecessary root task was removed; the workflow spec's start task is + used as the root instead. + + BpmnWorkflows and BpmnSubworkflows were split into to classes. + + Data objects are now stored on the topmost workflow where they are defined. + """ dct['VERSION'] = "1.3" update_event_definition_attributes(dct) remove_boundary_event_parent(dct) @@ -44,7 +68,6 @@ def from_version_1_2(dct): add_new_typenames(dct) update_data_objects(dct) - def from_version_1_1(dct): """ Upgrade v1.1 serialization to v1.2. @@ -98,4 +121,5 @@ def from_version_1_0(dct): '1.0': from_version_1_0, '1.1': from_version_1_1, '1.2': from_version_1_2, + '1.3': from_version_1_3, } diff --git a/SpiffWorkflow/bpmn/serializer/workflow.py b/SpiffWorkflow/bpmn/serializer/workflow.py index c76965f6..a13af7d0 100644 --- a/SpiffWorkflow/bpmn/serializer/workflow.py +++ b/SpiffWorkflow/bpmn/serializer/workflow.py @@ -25,7 +25,7 @@ from .config import DEFAULT_CONFIG # This is the default version set on the workflow, it can be overridden in init -VERSION = "1.3" +VERSION = "1.4" class BpmnWorkflowSerializer: diff --git a/tests/SpiffWorkflow/bpmn/data/serialization/v1.3-mi-states.json b/tests/SpiffWorkflow/bpmn/data/serialization/v1.3-mi-states.json new file mode 100644 index 00000000..8e192887 --- /dev/null +++ b/tests/SpiffWorkflow/bpmn/data/serialization/v1.3-mi-states.json @@ -0,0 +1,378 @@ +{ + "serializer_version": "1.3", + "data":{}, + "correlations":{}, + "last_task":"d022bf75-49a5-4c05-a46d-e3bd34029b1e", + "success":true, + "tasks":{ + "8d0c2d46-0de5-4616-a7ce-6d7f43e9dd3d":{ + "id":"8d0c2d46-0de5-4616-a7ce-6d7f43e9dd3d", + "parent":null, + "children":[ + "edea6aef-cbba-4fb8-a9fa-566b8f3e9df5" + ], + "last_state_change":1716952026.0028443, + "state":64, + "task_spec":"Start", + "triggered":false, + "internal_data":{}, + "data":{ + "input_data":[ + 1, + 2, + 3 + ] + }, + "typename":"Task" + }, + "edea6aef-cbba-4fb8-a9fa-566b8f3e9df5":{ + "id":"edea6aef-cbba-4fb8-a9fa-566b8f3e9df5", + "parent":"8d0c2d46-0de5-4616-a7ce-6d7f43e9dd3d", + "children":[ + "32cf0153-96b2-498f-8969-d0147e26af08" + ], + "last_state_change":1716952026.003962, + "state":64, + "task_spec":"StartEvent_1", + "triggered":false, + "internal_data":{ + "event_fired":true + }, + "data":{ + "input_data":[ + 1, + 2, + 3 + ] + }, + "typename":"Task" + }, + "32cf0153-96b2-498f-8969-d0147e26af08":{ + "id":"32cf0153-96b2-498f-8969-d0147e26af08", + "parent":"edea6aef-cbba-4fb8-a9fa-566b8f3e9df5", + "children":[ + "a5e49220-f190-4580-b1f0-d728a47ea87c", + "56892ee3-26fb-45f7-83af-d3f7dfd29e16", + "d022bf75-49a5-4c05-a46d-e3bd34029b1e", + "073c949f-5ee2-4132-8198-f7a20b1eaf91" + ], + "last_state_change":1716952026.0043452, + "state":8, + "task_spec":"any_task", + "triggered":false, + "internal_data":{ + "started":true, + "merged":[ + "56892ee3-26fb-45f7-83af-d3f7dfd29e16", + "d022bf75-49a5-4c05-a46d-e3bd34029b1e" + ] + }, + "data":{ + "input_data":[ + 1, + 2, + 3 + ], + "output_data":[ + 2, + 4 + ] + }, + "typename":"Task" + }, + "a5e49220-f190-4580-b1f0-d728a47ea87c":{ + "id":"a5e49220-f190-4580-b1f0-d728a47ea87c", + "parent":"32cf0153-96b2-498f-8969-d0147e26af08", + "children":[ + "9f9e2c26-d33e-476c-987f-98d08da790db" + ], + "last_state_change":1716952026.0009453, + "state":4, + "task_spec":"Event_1xk7z3g", + "triggered":false, + "internal_data":{}, + "data":{}, + "typename":"Task" + }, + "9f9e2c26-d33e-476c-987f-98d08da790db":{ + "id":"9f9e2c26-d33e-476c-987f-98d08da790db", + "parent":"a5e49220-f190-4580-b1f0-d728a47ea87c", + "children":[ + "39da9add-e22c-4a43-9719-3970c78194e7" + ], + "last_state_change":1716952026.001038, + "state":4, + "task_spec":"main.EndJoin", + "triggered":false, + "internal_data":{}, + "data":{}, + "typename":"Task" + }, + "39da9add-e22c-4a43-9719-3970c78194e7":{ + "id":"39da9add-e22c-4a43-9719-3970c78194e7", + "parent":"9f9e2c26-d33e-476c-987f-98d08da790db", + "children":[], + "last_state_change":1716952026.001173, + "state":4, + "task_spec":"End", + "triggered":false, + "internal_data":{}, + "data":{}, + "typename":"Task" + }, + "56892ee3-26fb-45f7-83af-d3f7dfd29e16":{ + "id":"56892ee3-26fb-45f7-83af-d3f7dfd29e16", + "parent":"32cf0153-96b2-498f-8969-d0147e26af08", + "children":[], + "last_state_change":1716952026.0068834, + "state":64, + "task_spec":"any_task [child]", + "triggered":true, + "internal_data":{ + "key_or_index":0 + }, + "data":{ + "input_item":1, + "input_data":[ + 1, + 2, + 3 + ], + "output_data":[], + "output_item":2 + }, + "typename":"Task" + }, + "d022bf75-49a5-4c05-a46d-e3bd34029b1e":{ + "id":"d022bf75-49a5-4c05-a46d-e3bd34029b1e", + "parent":"32cf0153-96b2-498f-8969-d0147e26af08", + "children":[], + "last_state_change":1716952029.4668372, + "state":64, + "task_spec":"any_task [child]", + "triggered":true, + "internal_data":{ + "key_or_index":1 + }, + "data":{ + "input_item":2, + "input_data":[ + 1, + 2, + 3 + ], + "output_data":[], + "output_item":4 + }, + "typename":"Task" + }, + "073c949f-5ee2-4132-8198-f7a20b1eaf91":{ + "id":"073c949f-5ee2-4132-8198-f7a20b1eaf91", + "parent":"32cf0153-96b2-498f-8969-d0147e26af08", + "children":[], + "last_state_change":1716952026.0049052, + "state":16, + "task_spec":"any_task [child]", + "triggered":true, + "internal_data":{ + "key_or_index":2 + }, + "data":{ + "input_item":3, + "input_data":[ + 1, + 2, + 3 + ], + "output_data":[] + }, + "typename":"Task" + } + }, + "root":"8d0c2d46-0de5-4616-a7ce-6d7f43e9dd3d", + "spec":{ + "name":"main", + "description":"main", + "file":"/home/essweine/work/sartography/SpiffWorkflow/tests/SpiffWorkflow/bpmn/data/parallel_multiinstance_loop_input.bpmn", + "task_specs":{ + "Start":{ + "name":"Start", + "description":"BPMN Task", + "manual":false, + "lookahead":2, + "inputs":[], + "outputs":[ + "StartEvent_1" + ], + "bpmn_id":null, + "bpmn_name":null, + "lane":null, + "documentation":null, + "data_input_associations":[], + "data_output_associations":[], + "io_specification":null, + "typename":"BpmnStartTask" + }, + "main.EndJoin":{ + "name":"main.EndJoin", + "description":"BPMN Task", + "manual":false, + "lookahead":2, + "inputs":[ + "Event_1xk7z3g" + ], + "outputs":[ + "End" + ], + "bpmn_id":null, + "bpmn_name":null, + "lane":null, + "documentation":null, + "data_input_associations":[], + "data_output_associations":[], + "io_specification":null, + "typename":"_EndJoin" + }, + "End":{ + "name":"End", + "description":"BPMN Task", + "manual":false, + "lookahead":2, + "inputs":[ + "main.EndJoin" + ], + "outputs":[], + "bpmn_id":null, + "bpmn_name":null, + "lane":null, + "documentation":null, + "data_input_associations":[], + "data_output_associations":[], + "io_specification":null, + "typename":"SimpleBpmnTask" + }, + "StartEvent_1":{ + "name":"StartEvent_1", + "description":"Default Start Event", + "manual":false, + "lookahead":2, + "inputs":[ + "Start" + ], + "outputs":[ + "any_task" + ], + "bpmn_id":"StartEvent_1", + "bpmn_name":null, + "lane":null, + "documentation":null, + "data_input_associations":[], + "data_output_associations":[], + "io_specification":null, + "event_definition":{ + "description":"Default", + "name":null, + "typename":"NoneEventDefinition" + }, + "typename":"StartEvent", + "extensions":{} + }, + "any_task":{ + "name":"any_task", + "description":"Parallel MultiInstance", + "manual":false, + "lookahead":2, + "inputs":[ + "StartEvent_1" + ], + "outputs":[ + "Event_1xk7z3g" + ], + "bpmn_id":"any_task", + "bpmn_name":null, + "lane":null, + "documentation":null, + "data_input_associations":[], + "data_output_associations":[], + "io_specification":null, + "task_spec":"any_task [child]", + "cardinality":null, + "data_input":{ + "bpmn_id":"input_data", + "bpmn_name":null, + "typename":"TaskDataReference" + }, + "data_output":{ + "bpmn_id":"output_data", + "bpmn_name":null, + "typename":"TaskDataReference" + }, + "input_item":{ + "bpmn_id":"input_item", + "bpmn_name":"input item", + "typename":"TaskDataReference" + }, + "output_item":{ + "bpmn_id":"output_item", + "bpmn_name":"output item", + "typename":"TaskDataReference" + }, + "condition":null, + "typename":"ParallelMultiInstanceTask" + }, + "any_task [child]":{ + "name":"any_task [child]", + "description":"Task", + "manual":true, + "lookahead":2, + "inputs":[ + "any_task" + ], + "outputs":[], + "bpmn_id":"any_task", + "bpmn_name":"Any Task", + "lane":null, + "documentation":null, + "data_input_associations":[], + "data_output_associations":[], + "io_specification":null, + "typename":"NoneTask", + "extensions":{} + }, + "Event_1xk7z3g":{ + "name":"Event_1xk7z3g", + "description":"Default End Event", + "manual":false, + "lookahead":2, + "inputs":[ + "any_task" + ], + "outputs":[ + "main.EndJoin" + ], + "bpmn_id":"Event_1xk7z3g", + "bpmn_name":null, + "lane":null, + "documentation":null, + "data_input_associations":[], + "data_output_associations":[], + "io_specification":null, + "event_definition":{ + "description":"Default", + "name":null, + "typename":"NoneEventDefinition" + }, + "typename":"EndEvent", + "extensions":{} + } + }, + "io_specification":null, + "data_objects":{}, + "correlation_keys":{}, + "typename":"BpmnProcessSpec" + }, + "subprocess_specs":{}, + "subprocesses":{}, + "bpmn_events":[], + "typename":"BpmnWorkflow" +} diff --git a/tests/SpiffWorkflow/bpmn/serializer/VersionMigrationTest.py b/tests/SpiffWorkflow/bpmn/serializer/VersionMigrationTest.py index 3b914b50..f0f5d33c 100644 --- a/tests/SpiffWorkflow/bpmn/serializer/VersionMigrationTest.py +++ b/tests/SpiffWorkflow/bpmn/serializer/VersionMigrationTest.py @@ -172,3 +172,36 @@ def test_update_nested_data_objects(self): self.assertNotIn('sub_level_data_object_two', call_sub.data_objects) self.assertIn('sub_level_data_object_three', call_sub.data_objects) self.assertNotIn('sub_level_data_object_three', process_sub.data_objects) + +class Version_1_3_Test(BaseTestCase): + + def test_update_mi_states(self): + + wf = self.deserialize_workflow('v1.3-mi-states.json') + + any_task = wf.get_next_task(spec_name='any_task') + task_info = any_task.task_spec.task_info(any_task) + instance_map = task_info['instance_map'] + + self.assertEqual(len(wf.get_tasks(state=TaskState.WAITING)), 0) + + ready_tasks = wf.get_tasks(state=TaskState.READY, manual=True) + self.assertEqual(len(ready_tasks), 1) + while len(ready_tasks) > 0: + task = ready_tasks[0] + task_info = task.task_spec.task_info(task) + self.assertEqual(task.task_spec.name, 'any_task [child]') + self.assertIn('input_item', task.data) + self.assertEqual(instance_map[task_info['instance']], str(task.id)) + task.data['output_item'] = task.data['input_item'] * 2 + task.run() + ready_tasks = wf.get_tasks(state=TaskState.READY, manual=True) + wf.refresh_waiting_tasks() + wf.do_engine_steps() + + any_task = wf.get_next_task(spec_name='any_task') + task_info = any_task.task_spec.task_info(any_task) + self.assertEqual(len(task_info['completed']), 3) + self.assertEqual(len(task_info['running']), 0) + self.assertEqual(len(task_info['future']), 0) + self.assertTrue(wf.is_completed())