Skip to content

Commit

Permalink
Merge pull request #5910 from cylc/8.2.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 8.2.x-sync into master
  • Loading branch information
oliver-sanders authored Jan 9, 2024
2 parents df350cd + 8002595 commit a7cf51b
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 179 deletions.
1 change: 0 additions & 1 deletion .github/workflows/test_fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ jobs:
time-zone: 'XXX-09:35'

env:
# Use non-UTC time zone
TZ: ${{ matrix.time-zone }}
PYTEST_ADDOPTS: --cov --cov-append -n 5 --color=yes

Expand Down
1 change: 1 addition & 0 deletions changes.d/5893.fix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed bug in computing a time interval-based runahead limit when future triggers are present.
16 changes: 0 additions & 16 deletions cylc/flow/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,22 +415,6 @@ def get_stop_point(self):
"""Return the last point of this sequence, or None if unbounded."""
pass

def get_first_n_points(self, n, point=None):
"""Return a list of first n points of this sequence."""
if point is None:
p1 = self.get_start_point()
else:
p1 = self.get_first_point(point)
if p1 is None:
return []
result = [p1]
for _ in range(1, n):
p1 = self.get_next_point_on_sequence(p1)
if p1 is None:
break
result.append(p1)
return result

@abstractmethod
def __eq__(self, other) -> bool:
# Return True if other (sequence) is equal to self.
Expand Down
16 changes: 13 additions & 3 deletions cylc/flow/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -947,12 +947,22 @@ def _interval_parse(interval_string):

def point_parse(point_string: str) -> 'TimePoint':
"""Parse a point_string into a proper TimePoint object."""
return _point_parse(point_string, WorkflowSpecifics.DUMP_FORMAT)
return _point_parse(
point_string,
WorkflowSpecifics.DUMP_FORMAT,
WorkflowSpecifics.ASSUMED_TIME_ZONE
)


@lru_cache(10000)
def _point_parse(point_string, _dump_fmt):
"""Parse a point_string into a proper TimePoint object."""
def _point_parse(point_string: str, _dump_fmt, _tz) -> 'TimePoint':
"""Parse a point_string into a proper TimePoint object.
Args:
point_string: The string to parse.
_dump_fmt: Dump format (only used to avoid invalid cache hits).
_tz: Cycle point time zone (only used to avoid invalid cache hits).
"""
if "%" in WorkflowSpecifics.DUMP_FORMAT:
# May be a custom not-quite ISO 8601 dump format.
with contextlib.suppress(IsodatetimeError):
Expand Down
118 changes: 45 additions & 73 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,82 +307,48 @@ def compute_runahead(self, force=False) -> bool:
With force=True we recompute the limit even if the base point has not
changed (needed if max_future_offset changed, or on reload).
"""
"""
limit = self.config.runahead_limit # e.g. P2 or P2D
count_cycles = False
with suppress(TypeError):
# Count cycles (integer cycling, and optional for datetime too).
ilimit = int(limit) # type: ignore
count_cycles = True

base_point: 'PointBase'
points: List['PointBase'] = []
base_point: Optional['PointBase'] = None

# First get the runahead base point.
if not self.main_pool:
# No tasks yet, just consider sequence points.
if count_cycles:
# Get the first ilimit points in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for plist in [
seq.get_first_n_points(
ilimit, self.config.start_point)
for seq in self.config.sequences
]
for point in plist
]
# Drop points beyond the limit.
points = sorted(points)[:ilimit + 1]
base_point = min(points)

else:
# Start at first point in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
base_point = min(points)
# Drop points beyond the limit.
points = [
point
for point in points
if point <= base_point + limit
]

# Find the earliest sequence point beyond the workflow start point.
base_point = min(
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
)
else:
# Find the earliest point with unfinished tasks.
# Find the earliest point with incomplete tasks.
for point, itasks in sorted(self.get_tasks_by_point().items()):
# All n=0 tasks are incomplete by definition, but Cylc 7
# ignores failed ones (it does not ignore submit-failed!).
if (
points # got the limit already so this point too
or any(
not itask.state(
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED
)
or (
# For Cylc 7 back-compat, ignore incomplete tasks.
# (Success is required in back-compat mode, so
# failedtasks end up as incomplete; and Cylc 7
# ignores failed tasks in computing the limit).
itask.state.outputs.is_incomplete()
and not cylc.flow.flags.cylc7_back_compat
)
cylc.flow.flags.cylc7_back_compat and
all(
itask.state(TASK_STATUS_FAILED)
for itask in itasks
)
):
points.append(point)
continue
base_point = point
break

if not points:
return False
base_point = min(points)
if base_point is None:
return False

LOG.debug(f"Runahead: base point {base_point}")

if self._prev_runahead_base_point is None:
self._prev_runahead_base_point = base_point
Expand All @@ -399,8 +365,10 @@ def compute_runahead(self, force=False) -> bool:
# change or the runahead limit is already at stop point.
return False

# Get all cycle points possible after the base point.
sequence_points: Set['PointBase']
# Now generate all possible cycle points from the base point and stop
# at the runahead limit point. Note both cycle count and time interval
# limits involve all possible cycles, not just active cycles.
sequence_points: Set['PointBase'] = set()
if (
not force
and self._prev_runahead_sequence_points
Expand All @@ -410,44 +378,48 @@ def compute_runahead(self, force=False) -> bool:
sequence_points = self._prev_runahead_sequence_points
else:
# Recompute possible points.
sequence_points = set()
for sequence in self.config.sequences:
seq_point = sequence.get_next_point(base_point)
seq_point = sequence.get_first_point(base_point)
count = 1
while seq_point is not None:
if count_cycles:
# P0 allows only the base cycle point to run.
if count > 1 + ilimit:
# this point may be beyond the runahead limit
break
else:
# PT0H allows only the base cycle point to run.
if seq_point > base_point + limit:
# this point can not be beyond the runahead limit
break
count += 1
sequence_points.add(seq_point)
seq_point = sequence.get_next_point(seq_point)
self._prev_runahead_sequence_points = sequence_points
self._prev_runahead_base_point = base_point

points = set(points).union(sequence_points)

if count_cycles:
# Some sequences may have different intervals.
limit_point = sorted(points)[:(ilimit + 1)][-1]
# (len(list) may be less than ilimit due to sequence end)
limit_point = sorted(sequence_points)[:ilimit + 1][-1]
else:
# We already stopped at the runahead limit.
limit_point = sorted(points)[-1]
limit_point = max(sequence_points)

# Adjust for future offset and stop point, if necessary.
# Adjust for future offset and stop point.
pre_adj_limit = limit_point
if self.max_future_offset is not None:
limit_point += self.max_future_offset
LOG.debug(f"{pre_adj_limit} -> {limit_point} (future offset)")
LOG.debug(
"Runahead (future trigger adjust):"
f" {pre_adj_limit} -> {limit_point}"
)
if self.stop_point and limit_point > self.stop_point:
limit_point = self.stop_point
LOG.debug(f"{pre_adj_limit} -> {limit_point} (stop point)")
LOG.debug(f"Runahead limit: {limit_point}")
LOG.debug(
"Runahead (stop point adjust):"
f" {pre_adj_limit} -> {limit_point} (stop point)"
)

LOG.debug(f"Runahead limit: {limit_point}")
self.runahead_limit_point = limit_point
return True

Expand Down
Loading

0 comments on commit a7cf51b

Please sign in to comment.