diff --git a/cluster_tools/cluster_tools/__init__.py b/cluster_tools/cluster_tools/__init__.py index 46a07c542..679e2d0b8 100644 --- a/cluster_tools/cluster_tools/__init__.py +++ b/cluster_tools/cluster_tools/__init__.py @@ -18,7 +18,7 @@ from cluster_tools.executors.sequential import SequentialExecutor from cluster_tools.schedulers.cluster_executor import ( ClusterExecutor, - RemoteOutOfMemoryException, + RemoteResourceLimitException, ) from cluster_tools.schedulers.kube import KubernetesExecutor from cluster_tools.schedulers.pbs import PBSExecutor diff --git a/cluster_tools/cluster_tools/schedulers/cluster_executor.py b/cluster_tools/cluster_tools/schedulers/cluster_executor.py index ebfeb02c6..2de8e515b 100644 --- a/cluster_tools/cluster_tools/schedulers/cluster_executor.py +++ b/cluster_tools/cluster_tools/schedulers/cluster_executor.py @@ -57,7 +57,7 @@ def __str__(self) -> str: return self.error.strip() + f" (job_id={self.job_id})" -class RemoteOutOfMemoryException(RemoteException): +class RemoteResourceLimitException(RemoteException): def __str__(self) -> str: return str(self.job_id) + "\n" + self.error.strip() diff --git a/cluster_tools/cluster_tools/schedulers/slurm.py b/cluster_tools/cluster_tools/schedulers/slurm.py index 1956c1aa4..66c7ad24e 100644 --- a/cluster_tools/cluster_tools/schedulers/slurm.py +++ b/cluster_tools/cluster_tools/schedulers/slurm.py @@ -29,7 +29,7 @@ NOT_YET_SUBMITTED_STATE, ClusterExecutor, RemoteException, - RemoteOutOfMemoryException, + RemoteResourceLimitException, ) SLURM_STATES = { @@ -411,69 +411,110 @@ def matches_states(slurm_states: List[str]) -> bool: def investigate_failed_job( self, job_id_with_index: str ) -> Optional[Tuple[str, Type[RemoteException]]]: - # We call `seff job_id` which should return some output including a line, - # such as: "Memory Efficiency: 25019.18% of 1.00 GB" + # This function tries to find the reason for a failed job by first checking whether + # the job run time exceeded the specified time limit. If that is not the case, it + # checks whether the job used too much RAM. As a last resort, it checks the exit code + # of the job: If the job was killed with signal 9, it's very likely due to some + # resource limit. + + def parse_key_value_pairs( + pair_delimiter: str, key_value_delimiter: str + ) -> Dict[str, str]: + properties = {} + for key_value_pair in stdout.split(pair_delimiter): + if key_value_delimiter not in key_value_pair: + continue + key, value = key_value_pair.split(key_value_delimiter, 1) + properties[key.strip()] = value.strip() + return properties + + # Call `scontrol show jobid=` which should return some output including + # a key=value pairs such as: "Reason=...", "TimeLimit=...", and "RunTime=..." + stdout, _, exit_code = call(f"scontrol show jobid={job_id_with_index}") + + if exit_code == 0: + # Parse stdout into a key-value object + properties = parse_key_value_pairs(" ", "=") + + investigation = self._investigate_time_limit(properties) + if investigation: + return investigation + # Call `seff job_id` which should return some output including a line, + # such as: "Memory Efficiency: 25019.18% of 1.00 GB" stdout, _, exit_code = call(f"seff {job_id_with_index}") if exit_code != 0: return None # Parse stdout into a key-value object - properties = {} - for line in stdout.split("\n"): - if ":" not in line: - continue - key, value = line.split(":", 1) - properties[key.strip()] = value.strip() - - def investigate_memory_consumption() -> ( - Optional[Tuple[str, Type[RemoteOutOfMemoryException]]] - ): - if not properties.get("Memory Efficiency", None): - return None - - # Extract the "25019.18% of 1.00 GB" part of the line - efficiency_note = properties["Memory Efficiency"] - PERCENTAGE_REGEX = r"([0-9]+(\.[0-9]+)?)%" - - # Extract the percentage to see whether it exceeds 100%. - match = re.search(PERCENTAGE_REGEX, efficiency_note) - percentage = None - if match is None: - return None - - try: - percentage = float(match.group(1)) - except ValueError: - return None - - if percentage < 100: - return None - - reason = f"The job was probably terminated because it consumed too much memory ({efficiency_note})." - return (reason, RemoteOutOfMemoryException) - - def investigate_exit_code() -> ( - Optional[Tuple[str, Type[RemoteOutOfMemoryException]]] - ): - if not properties.get("State", None): - return None - if "exit code 137" not in properties["State"]: - return None - reason = ( - "The job was probably terminated because it consumed too " - "much memory (at least, the exit code 137 suggests this). Please " - "use the `seff` utility to inspect the failed job and its potential " - "job siblings (in case of an array job) to doublecheck the memory " - "consumption." - ) - return (reason, RemoteOutOfMemoryException) + properties = parse_key_value_pairs("\n", ":") - investigation = investigate_memory_consumption() + investigation = self._investigate_memory_consumption(properties) if investigation: return investigation - return investigate_exit_code() + return self._investigate_exit_code(properties) + + def _investigate_time_limit( + self, properties: Dict[str, str] + ) -> Optional[Tuple[str, Type[RemoteResourceLimitException]]]: + reason = properties.get("Reason", None) + if not reason: + return None + + if reason != "TimeLimit": + return None + + time_limit = properties.get("TimeLimit", None) + run_time = properties.get("RunTime", None) + time_limit_note = f"Time Limit: {time_limit} Run Time: {run_time}" + + reason = f"The job was probably terminated because it ran for too long ({time_limit_note})." + return (reason, RemoteResourceLimitException) + + def _investigate_memory_consumption( + self, properties: Dict[str, str] + ) -> Optional[Tuple[str, Type[RemoteResourceLimitException]]]: + if not properties.get("Memory Efficiency", None): + return None + + # Extract the "25019.18% of 1.00 GB" part of the line + efficiency_note = properties["Memory Efficiency"] + PERCENTAGE_REGEX = r"([0-9]+(\.[0-9]+)?)%" + + # Extract the percentage to see whether it exceeds 100%. + match = re.search(PERCENTAGE_REGEX, efficiency_note) + percentage = None + if match is None: + return None + + try: + percentage = float(match.group(1)) + except ValueError: + return None + + if percentage < 100: + return None + + reason = f"The job was probably terminated because it consumed too much memory ({efficiency_note})." + return (reason, RemoteResourceLimitException) + + def _investigate_exit_code( + self, properties: Dict[str, str] + ) -> Optional[Tuple[str, Type[RemoteResourceLimitException]]]: + if not properties.get("State", None): + return None + # For exit codes >128, subtract 128 to obtain the linux signal number which is SIGKILL (9) in this case + if "exit code 137" not in properties["State"]: + return None + reason = ( + "The job was probably terminated because it consumed too much memory " + "or ran for too long (at least, the exit code 137 suggests this). Please " + "use the `seff` utility to inspect the failed job and its potential " + "job siblings (in case of an array job) to doublecheck the memory " + "consumption and run time." + ) + return (reason, RemoteResourceLimitException) def get_pending_tasks(self) -> Iterable[str]: try: diff --git a/cluster_tools/dockered-slurm/docker-compose.yml b/cluster_tools/dockered-slurm/docker-compose.yml index 139f89445..a0a195439 100644 --- a/cluster_tools/dockered-slurm/docker-compose.yml +++ b/cluster_tools/dockered-slurm/docker-compose.yml @@ -22,6 +22,7 @@ services: volumes: - etc_munge:/etc/munge - etc_slurm:/etc/slurm + - ${PWD}/slurm.conf:/etc/slurm/slurm.conf - var_log_slurm:/var/log/slurm expose: - "6819" @@ -38,6 +39,7 @@ services: volumes: - etc_munge:/etc/munge - etc_slurm:/etc/slurm + - ${PWD}/slurm.conf:/etc/slurm/slurm.conf - ./slurm_jobdir:/data - ..:/cluster_tools - var_log_slurm:/var/log/slurm @@ -55,6 +57,7 @@ services: volumes: - etc_munge:/etc/munge - etc_slurm:/etc/slurm + - ${PWD}/slurm.conf:/etc/slurm/slurm.conf - ./slurm_jobdir:/data - ..:/cluster_tools - var_log_slurm:/var/log/slurm @@ -71,6 +74,7 @@ services: volumes: - etc_munge:/etc/munge - etc_slurm:/etc/slurm + - ${PWD}/slurm.conf:/etc/slurm/slurm.conf - ./slurm_jobdir:/data - ..:/cluster_tools - var_log_slurm:/var/log/slurm diff --git a/cluster_tools/dockered-slurm/slurm.conf b/cluster_tools/dockered-slurm/slurm.conf index 4fef3ed7c..d3afd5c35 100644 --- a/cluster_tools/dockered-slurm/slurm.conf +++ b/cluster_tools/dockered-slurm/slurm.conf @@ -77,6 +77,7 @@ JobCompLoc=/var/log/slurm/jobcomp.log # ACCOUNTING JobAcctGatherType=jobacct_gather/linux JobAcctGatherFrequency=30 +JobAcctGatherParams=OverMemoryKill # AccountingStorageType=accounting_storage/slurmdbd AccountingStorageHost=slurmdbd diff --git a/cluster_tools/tests/test_slurm.py b/cluster_tools/tests/test_slurm.py index b4e9386de..8f3a58d84 100644 --- a/cluster_tools/tests/test_slurm.py +++ b/cluster_tools/tests/test_slurm.py @@ -6,6 +6,7 @@ import os import shutil import signal +import sys import tempfile import time from collections import Counter @@ -29,6 +30,13 @@ def sleep(duration: float) -> float: return duration +def allocate(duration: float, num_bytes: int) -> int: + time.sleep(duration) + data = b"\x00" * num_bytes + time.sleep(duration) + return sys.getsizeof(data) + + logging.basicConfig() @@ -284,11 +292,70 @@ def test_slurm_max_array_size() -> None: assert all(array_size <= max_array_size for array_size in occurences) finally: - chcall(f"sed -i 's/{command}//g' /etc/slurm/slurm.conf && scontrol reconfigure") + chcall( + f"sed -ci 's/{command}//g' /etc/slurm/slurm.conf && scontrol reconfigure" + ) reset_max_array_size = executor.get_max_array_size() assert reset_max_array_size == original_max_array_size +# TODO: Comment back in after the test ran through in the CI +# @pytest.mark.skip( +# reason="This test takes more than a minute and is disabled by default. Execute it when modifying the RemoteResourceLimitException code." +# ) +def test_slurm_time_limit() -> None: + # Time limit resolution is 1 minute, so request 1 minute + executor = cluster_tools.get_executor( + "slurm", debug=True, job_resources={"time": "0-00:01:00"} + ) + + with executor: + # Schedule a job that runs for more than 1 minute + futures = executor.map_to_futures(sleep, [80]) + concurrent.futures.wait(futures) + + # Job should have been killed with a RemoteResourceLimitException + assert all( + isinstance(fut.exception(), cluster_tools.RemoteResourceLimitException) + for fut in futures + ) + + +def test_slurm_memory_limit() -> None: + # Request 1 MB + executor = cluster_tools.get_executor( + "slurm", debug=True, job_resources={"mem": "1M"} + ) + + original_gather_frequency_config = "JobAcctGatherFrequency=30" # from slurm.conf + new_gather_frequency_config = "JobAcctGatherFrequency=1" + + try: + # Increase the frequency at which slurm checks whether a job uses too much memory + chcall( + f"sed -ci 's/{original_gather_frequency_config}/{new_gather_frequency_config}/g' /etc/slurm/slurm.conf && scontrol reconfigure" + ) + + with executor: + # Schedule a job that allocates more than 1 MB and let it run for more than 1 second + # because the frequency of the memory polling is 1 second + duration = 3 + futures = executor.map_to_futures( + partial(allocate, duration), [1024 * 1024 * 2] + ) + concurrent.futures.wait(futures) + + # Job should have been killed with a RemoteResourceLimitException + assert all( + isinstance(fut.exception(), cluster_tools.RemoteResourceLimitException) + for fut in futures + ) + finally: + chcall( + f"sed -ci 's/{new_gather_frequency_config}/{original_gather_frequency_config}/g' /etc/slurm/slurm.conf && scontrol reconfigure" + ) + + def test_slurm_max_array_size_env() -> None: max_array_size = 2