Skip to content

Commit

Permalink
Add detection for exceeded time limits for slurm jobs.
Browse files Browse the repository at this point in the history
Add tests to check that job kills due to memory or time are
detected by the cluster_tools.
Use a new version of the slurm docker image.
  • Loading branch information
daniel-wer committed Jul 20, 2023
1 parent dc8b301 commit e8a486a
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 57 deletions.
2 changes: 1 addition & 1 deletion cluster_tools/cluster_tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from cluster_tools.executors.sequential import SequentialExecutor
from cluster_tools.schedulers.cluster_executor import (
ClusterExecutor,
RemoteOutOfMemoryException,
RemoteResourceLimitException,
)
from cluster_tools.schedulers.kube import KubernetesExecutor
from cluster_tools.schedulers.pbs import PBSExecutor
Expand Down
2 changes: 1 addition & 1 deletion cluster_tools/cluster_tools/schedulers/cluster_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __str__(self) -> str:
return self.error.strip() + f" (job_id={self.job_id})"


class RemoteOutOfMemoryException(RemoteException):
class RemoteResourceLimitException(RemoteException):
def __str__(self) -> str:
return str(self.job_id) + "\n" + self.error.strip()

Expand Down
149 changes: 95 additions & 54 deletions cluster_tools/cluster_tools/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
NOT_YET_SUBMITTED_STATE,
ClusterExecutor,
RemoteException,
RemoteOutOfMemoryException,
RemoteResourceLimitException,
)

SLURM_STATES = {
Expand Down Expand Up @@ -411,69 +411,110 @@ def matches_states(slurm_states: List[str]) -> bool:
def investigate_failed_job(
self, job_id_with_index: str
) -> Optional[Tuple[str, Type[RemoteException]]]:
# We call `seff job_id` which should return some output including a line,
# such as: "Memory Efficiency: 25019.18% of 1.00 GB"
# This function tries to find the reason for a failed job by first checking whether
# the job run time exceeded the specified time limit. If that is not the case, it
# checks whether the job used too much RAM. As a last resort, it checks the exit code
# of the job: If the job was killed with signal 9, it's very likely due to some
# resource limit.

def parse_key_value_pairs(
pair_delimiter: str, key_value_delimiter: str
) -> Dict[str, str]:
properties = {}
for key_value_pair in stdout.split(pair_delimiter):
if key_value_delimiter not in key_value_pair:
continue
key, value = key_value_pair.split(key_value_delimiter, 1)
properties[key.strip()] = value.strip()
return properties

# Call `scontrol show jobid=<job_id>` which should return some output including
# a key=value pairs such as: "Reason=...", "TimeLimit=...", and "RunTime=..."
stdout, _, exit_code = call(f"scontrol show jobid={job_id_with_index}")

if exit_code == 0:
# Parse stdout into a key-value object
properties = parse_key_value_pairs(" ", "=")

investigation = self._investigate_time_limit(properties)
if investigation:
return investigation

# Call `seff job_id` which should return some output including a line,
# such as: "Memory Efficiency: 25019.18% of 1.00 GB"
stdout, _, exit_code = call(f"seff {job_id_with_index}")
if exit_code != 0:
return None

# Parse stdout into a key-value object
properties = {}
for line in stdout.split("\n"):
if ":" not in line:
continue
key, value = line.split(":", 1)
properties[key.strip()] = value.strip()

def investigate_memory_consumption() -> (
Optional[Tuple[str, Type[RemoteOutOfMemoryException]]]
):
if not properties.get("Memory Efficiency", None):
return None

# Extract the "25019.18% of 1.00 GB" part of the line
efficiency_note = properties["Memory Efficiency"]
PERCENTAGE_REGEX = r"([0-9]+(\.[0-9]+)?)%"

# Extract the percentage to see whether it exceeds 100%.
match = re.search(PERCENTAGE_REGEX, efficiency_note)
percentage = None
if match is None:
return None

try:
percentage = float(match.group(1))
except ValueError:
return None

if percentage < 100:
return None

reason = f"The job was probably terminated because it consumed too much memory ({efficiency_note})."
return (reason, RemoteOutOfMemoryException)

def investigate_exit_code() -> (
Optional[Tuple[str, Type[RemoteOutOfMemoryException]]]
):
if not properties.get("State", None):
return None
if "exit code 137" not in properties["State"]:
return None
reason = (
"The job was probably terminated because it consumed too "
"much memory (at least, the exit code 137 suggests this). Please "
"use the `seff` utility to inspect the failed job and its potential "
"job siblings (in case of an array job) to doublecheck the memory "
"consumption."
)
return (reason, RemoteOutOfMemoryException)
properties = parse_key_value_pairs("\n", ":")

investigation = investigate_memory_consumption()
investigation = self._investigate_memory_consumption(properties)
if investigation:
return investigation

return investigate_exit_code()
return self._investigate_exit_code(properties)

def _investigate_time_limit(
self, properties: Dict[str, str]
) -> Optional[Tuple[str, Type[RemoteResourceLimitException]]]:
reason = properties.get("Reason", None)
if not reason:
return None

if reason != "TimeLimit":
return None

time_limit = properties.get("TimeLimit", None)
run_time = properties.get("RunTime", None)
time_limit_note = f"Time Limit: {time_limit} Run Time: {run_time}"

reason = f"The job was probably terminated because it ran for too long ({time_limit_note})."
return (reason, RemoteResourceLimitException)

def _investigate_memory_consumption(
self, properties: Dict[str, str]
) -> Optional[Tuple[str, Type[RemoteResourceLimitException]]]:
if not properties.get("Memory Efficiency", None):
return None

# Extract the "25019.18% of 1.00 GB" part of the line
efficiency_note = properties["Memory Efficiency"]
PERCENTAGE_REGEX = r"([0-9]+(\.[0-9]+)?)%"

# Extract the percentage to see whether it exceeds 100%.
match = re.search(PERCENTAGE_REGEX, efficiency_note)
percentage = None
if match is None:
return None

try:
percentage = float(match.group(1))
except ValueError:
return None

if percentage < 100:
return None

reason = f"The job was probably terminated because it consumed too much memory ({efficiency_note})."
return (reason, RemoteResourceLimitException)

def _investigate_exit_code(
self, properties: Dict[str, str]
) -> Optional[Tuple[str, Type[RemoteResourceLimitException]]]:
if not properties.get("State", None):
return None
# For exit codes >128, subtract 128 to obtain the linux signal number which is SIGKILL (9) in this case
if "exit code 137" not in properties["State"]:
return None
reason = (
"The job was probably terminated because it consumed too much memory "
"or ran for too long (at least, the exit code 137 suggests this). Please "
"use the `seff` utility to inspect the failed job and its potential "
"job siblings (in case of an array job) to doublecheck the memory "
"consumption and run time."
)
return (reason, RemoteResourceLimitException)

def get_pending_tasks(self) -> Iterable[str]:
try:
Expand Down
4 changes: 4 additions & 0 deletions cluster_tools/dockered-slurm/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ services:
volumes:
- etc_munge:/etc/munge
- etc_slurm:/etc/slurm
- ${PWD}/slurm.conf:/etc/slurm/slurm.conf
- var_log_slurm:/var/log/slurm
expose:
- "6819"
Expand All @@ -38,6 +39,7 @@ services:
volumes:
- etc_munge:/etc/munge
- etc_slurm:/etc/slurm
- ${PWD}/slurm.conf:/etc/slurm/slurm.conf
- ./slurm_jobdir:/data
- ..:/cluster_tools
- var_log_slurm:/var/log/slurm
Expand All @@ -55,6 +57,7 @@ services:
volumes:
- etc_munge:/etc/munge
- etc_slurm:/etc/slurm
- ${PWD}/slurm.conf:/etc/slurm/slurm.conf
- ./slurm_jobdir:/data
- ..:/cluster_tools
- var_log_slurm:/var/log/slurm
Expand All @@ -71,6 +74,7 @@ services:
volumes:
- etc_munge:/etc/munge
- etc_slurm:/etc/slurm
- ${PWD}/slurm.conf:/etc/slurm/slurm.conf
- ./slurm_jobdir:/data
- ..:/cluster_tools
- var_log_slurm:/var/log/slurm
Expand Down
1 change: 1 addition & 0 deletions cluster_tools/dockered-slurm/slurm.conf
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ JobCompLoc=/var/log/slurm/jobcomp.log
# ACCOUNTING
JobAcctGatherType=jobacct_gather/linux
JobAcctGatherFrequency=30
JobAcctGatherParams=OverMemoryKill
#
AccountingStorageType=accounting_storage/slurmdbd
AccountingStorageHost=slurmdbd
Expand Down
69 changes: 68 additions & 1 deletion cluster_tools/tests/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import shutil
import signal
import sys
import tempfile
import time
from collections import Counter
Expand All @@ -29,6 +30,13 @@ def sleep(duration: float) -> float:
return duration


def allocate(duration: float, num_bytes: int) -> int:
time.sleep(duration)
data = b"\x00" * num_bytes
time.sleep(duration)
return sys.getsizeof(data)


logging.basicConfig()


Expand Down Expand Up @@ -284,11 +292,70 @@ def test_slurm_max_array_size() -> None:

assert all(array_size <= max_array_size for array_size in occurences)
finally:
chcall(f"sed -i 's/{command}//g' /etc/slurm/slurm.conf && scontrol reconfigure")
chcall(
f"sed -ci 's/{command}//g' /etc/slurm/slurm.conf && scontrol reconfigure"
)
reset_max_array_size = executor.get_max_array_size()
assert reset_max_array_size == original_max_array_size


# TODO: Comment back in after the test ran through in the CI
# @pytest.mark.skip(
# reason="This test takes more than a minute and is disabled by default. Execute it when modifying the RemoteResourceLimitException code."
# )
def test_slurm_time_limit() -> None:
# Time limit resolution is 1 minute, so request 1 minute
executor = cluster_tools.get_executor(
"slurm", debug=True, job_resources={"time": "0-00:01:00"}
)

with executor:
# Schedule a job that runs for more than 1 minute
futures = executor.map_to_futures(sleep, [80])
concurrent.futures.wait(futures)

# Job should have been killed with a RemoteResourceLimitException
assert all(
isinstance(fut.exception(), cluster_tools.RemoteResourceLimitException)
for fut in futures
)


def test_slurm_memory_limit() -> None:
# Request 1 MB
executor = cluster_tools.get_executor(
"slurm", debug=True, job_resources={"mem": "1M"}
)

original_gather_frequency_config = "JobAcctGatherFrequency=30" # from slurm.conf
new_gather_frequency_config = "JobAcctGatherFrequency=1"

try:
# Increase the frequency at which slurm checks whether a job uses too much memory
chcall(
f"sed -ci 's/{original_gather_frequency_config}/{new_gather_frequency_config}/g' /etc/slurm/slurm.conf && scontrol reconfigure"
)

with executor:
# Schedule a job that allocates more than 1 MB and let it run for more than 1 second
# because the frequency of the memory polling is 1 second
duration = 3
futures = executor.map_to_futures(
partial(allocate, duration), [1024 * 1024 * 2]
)
concurrent.futures.wait(futures)

# Job should have been killed with a RemoteResourceLimitException
assert all(
isinstance(fut.exception(), cluster_tools.RemoteResourceLimitException)
for fut in futures
)
finally:
chcall(
f"sed -ci 's/{new_gather_frequency_config}/{original_gather_frequency_config}/g' /etc/slurm/slurm.conf && scontrol reconfigure"
)


def test_slurm_max_array_size_env() -> None:
max_array_size = 2

Expand Down

0 comments on commit e8a486a

Please sign in to comment.