From da67f54efcbf89cb6cd85d61674dc192d918af0e Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Mon, 31 Jul 2023 23:16:15 +1200 Subject: [PATCH] state totals adjustment --- cylc/flow/data_store_mgr.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index d04187e6e99..0bcf964e8e5 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -505,6 +505,7 @@ def __init__(self, schd): self.prune_flagged_nodes = set() self.pruned_task_proxies = set() self.updates_pending = False + self.next_update_pending = False self.publish_pending = False def initiate_data_model(self, reloaded=False): @@ -1656,14 +1657,13 @@ def update_data_structure(self, reloaded=False): if self.state_update_families: self.update_family_proxies() - next_update_pending = False if self.updates_pending: # Update workflow statuses and totals if needed self.update_workflow() # Don't process updated deltas of pruned nodes if self.pruned_task_proxies: - next_update_pending = True + self.next_update_pending = True self.prune_pruned_updated_nodes() # Apply current deltas @@ -1679,7 +1679,8 @@ def update_data_structure(self, reloaded=False): # Gather this batch of deltas for publish self.publish_deltas = self.get_publish_deltas() - self.updates_pending = next_update_pending + self.updates_pending = self.next_update_pending + self.next_update_pending = False # Clear deltas self.clear_delta_batch() @@ -1748,9 +1749,12 @@ def prune_data_store(self): node_ids, parent_ids, checked_ids, self.family_pruned_ids) if self.family_pruned_ids: self.deltas[FAMILY_PROXIES].pruned.extend(self.family_pruned_ids) + self.next_update_pending = True if node_ids: self.pruned_task_proxies.update(node_ids) self.updates_pending = True + if self.deltas[TASK_PROXIES].pruned: + self.next_update_pending = True def _family_ascent_point_prune( self, fp_id, node_ids, parent_ids, checked_ids, prune_ids): @@ -1843,6 +1847,12 @@ def update_family_proxies(self): next(iter(self.state_update_families))) if self.updated_state_families: self.updates_pending = True + if ( + self.deltas[FAMILY_PROXIES].pruned + or self.deltas[TASK_PROXIES].pruned + ): + self.state_update_families.update(self.updated_state_families) + self.next_updates_pending = True def _family_ascent_point_update(self, fp_id): """Updates the given family and children recursively. @@ -1893,6 +1903,7 @@ def _family_ascent_point_update(self, fp_id): state_counter += Counter(dict(child_node.state_totals)) # Gather all child task states task_states = [] + # Filter out bound for prunning for tp_id in fam_node.child_tasks: tp_delta = tp_updated.get(tp_id) @@ -1982,6 +1993,8 @@ def update_workflow(self): for n in self.added[FAMILY_PROXIES].values() if n.name == 'root'] ): + if root_id in self.family_pruned_ids: + continue root_node_updated = self.updated[FAMILY_PROXIES].get(root_id) if root_node_updated is not None and root_node_updated.state: root_node = root_node_updated