Skip to content

Commit

Permalink
state totals adjustment
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Aug 1, 2023
1 parent 0e62b33 commit f10aa7b
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ def __init__(self, schd):
self.prune_flagged_nodes = set()
self.pruned_task_proxies = set()
self.updates_pending = False
self.next_update_pending = False
self.publish_pending = False

def initiate_data_model(self, reloaded=False):
Expand Down Expand Up @@ -1657,14 +1658,13 @@ def update_data_structure(self, reloaded=False):
if self.state_update_families:
self.update_family_proxies()

next_update_pending = False
if self.updates_pending:
# Update workflow statuses and totals if needed
self.update_workflow()

# Don't process updated deltas of pruned nodes
if self.pruned_task_proxies:
next_update_pending = True
self.next_update_pending = True
self.prune_pruned_updated_nodes()

# Apply current deltas
Expand All @@ -1680,7 +1680,8 @@ def update_data_structure(self, reloaded=False):
# Gather this batch of deltas for publish
self.publish_deltas = self.get_publish_deltas()

self.updates_pending = next_update_pending
self.updates_pending = self.next_update_pending
self.next_update_pending = False

# Clear deltas
self.clear_delta_batch()
Expand Down Expand Up @@ -1749,9 +1750,12 @@ def prune_data_store(self):
node_ids, parent_ids, checked_ids, self.family_pruned_ids)
if self.family_pruned_ids:
self.deltas[FAMILY_PROXIES].pruned.extend(self.family_pruned_ids)
self.next_update_pending = True
if node_ids:
self.pruned_task_proxies.update(node_ids)
self.updates_pending = True
if self.deltas[TASK_PROXIES].pruned:
self.next_update_pending = True

def _family_ascent_point_prune(
self, fp_id, node_ids, parent_ids, checked_ids, prune_ids):
Expand Down Expand Up @@ -1844,6 +1848,12 @@ def update_family_proxies(self):
next(iter(self.state_update_families)))
if self.updated_state_families:
self.updates_pending = True
if (
self.deltas[FAMILY_PROXIES].pruned
or self.deltas[TASK_PROXIES].pruned
):
self.state_update_families.update(self.updated_state_families)
self.next_updates_pending = True

def _family_ascent_point_update(self, fp_id):
"""Updates the given family and children recursively.
Expand Down Expand Up @@ -1894,6 +1904,7 @@ def _family_ascent_point_update(self, fp_id):
state_counter += Counter(dict(child_node.state_totals))
# Gather all child task states
task_states = []
# Filter out bound for prunning
for tp_id in fam_node.child_tasks:

tp_delta = tp_updated.get(tp_id)
Expand Down Expand Up @@ -1983,6 +1994,8 @@ def update_workflow(self):
for n in self.added[FAMILY_PROXIES].values()
if n.name == 'root']
):
if root_id in self.family_pruned_ids:
continue
root_node_updated = self.updated[FAMILY_PROXIES].get(root_id)
if root_node_updated is not None and root_node_updated.state:
root_node = root_node_updated
Expand Down

0 comments on commit f10aa7b

Please sign in to comment.