Skip to content

Commit

Permalink
Merge branch 'master' into milatools
Browse files Browse the repository at this point in the history
  • Loading branch information
nurbal authored Oct 6, 2024
2 parents e96fbe4 + ec45280 commit 81d38a0
Show file tree
Hide file tree
Showing 51 changed files with 1,164 additions and 58 deletions.
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,16 @@ disable = [
"import-outside-toplevel", # These imports are useful to reduce loading times
"too-many-arguments",
"too-many-locals",
"too-many-positional-arguments",
"missing-module-docstring",
"missing-class-docstring",
"missing-function-docstring",
"invalid-name",
"no-else-return", # Bad rule IMO (- OB)
"line-too-long", # Black takes care of line length.
"logging-fstring-interpolation"
"logging-fstring-interpolation",
"duplicate-code",
"too-many-positional-arguments",
]
extension-pkg-whitelist = "pydantic"

Expand Down
1 change: 0 additions & 1 deletion sarc/alerts/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class CachedResult:

@dataclass(unsafe_hash=True)
class Timespan:

# Time duration
duration: timedelta

Expand Down
1 change: 0 additions & 1 deletion sarc/alerts/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ class HealthMonitorConfig:
checks: dict[str, TaggedSubclass[HealthCheck]] = field(default_factory=dict)

def __post_init__(self):

all_checks = {}

# Parameterize the checks
Expand Down
46 changes: 46 additions & 0 deletions sarc/alerts/usage_alerts/cluster_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import logging
from datetime import datetime, time, timedelta

from sarc.client.job import get_available_clusters
from sarc.config import MTL

logger = logging.getLogger(__name__)


def check_cluster_response(time_interval: timedelta = timedelta(days=7)):
"""
Check if we scraped clusters recently.
Log a warning for each cluster not scraped since `time_interval` from now.
Parameters
----------
time_interval: timedelta
Interval of time (until current time) in which we want to see cluster scrapings.
For each cluster, if the latest scraping occurred before this period, a warning will be logged.
Default is 7 days.
"""
# Get current date
current_date = datetime.now(tz=MTL)
# Get the oldest date allowed from now
oldest_allowed_date = current_date - time_interval
# Check each available cluster
for cluster in get_available_clusters():
if cluster.end_date is None:
logger.warning(
f"[{cluster.cluster_name}] no end_date available, cannot check last scraping"
)
else:
# Cluster's latest scraping date should be in `cluster.end_date`.
# NB: We assume cluster's `end_date` is stored as a date string,
# so we must first convert it to a datetime object.
# `en_date` is parsed the same way as start/end parameters in `get_jobs()`.
cluster_end_date = datetime.combine(
datetime.strptime(cluster.end_date, "%Y-%m-%d"), time.min
).replace(tzinfo=MTL)
# Now we can check.
if cluster_end_date < oldest_allowed_date:
logger.warning(
f"[{cluster.cluster_name}] no scraping since {cluster_end_date}, "
f"oldest required: {oldest_allowed_date}, "
f"current time: {current_date}"
)
129 changes: 129 additions & 0 deletions sarc/alerts/usage_alerts/cluster_scraping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import logging
import sys
from datetime import datetime, timedelta
from typing import List, Optional

import pandas

from sarc.config import MTL
from sarc.jobs.series import compute_time_frames, load_job_series

logger = logging.getLogger(__name__)


def check_nb_jobs_per_cluster_per_time(
time_interval: Optional[timedelta] = timedelta(days=7),
time_unit=timedelta(days=1),
cluster_names: Optional[List[str]] = None,
nb_stddev=2,
verbose=False,
):
"""
Check if we have scraped enough jobs per time unit per cluster on given time interval.
Log a warning for each cluster where number of jobs per time unit is lower than a limit
computed using mean and standard deviation statistics from this cluster.
Parameters
----------
time_interval: timedelta
If given, only jobs which ran in [now - time_interval, time_interval] will be used for checking.
Default is last 7 days.
If None, all jobs are used.
time_unit: timedelta
Time unit in which we must check cluster usage through time_interval. Default is 1 day.
cluster_names: list
Optional list of clusters to check.
If empty (or not specified), use all clusters available among jobs retrieved with time_interval.
nb_stddev: int
Amount of standard deviation to remove from average statistics to compute checking threshold.
For each cluster, threshold is computed as:
max(0, average - nb_stddev * stddev)
verbose: bool
If True, print supplementary info about clusters statistics.
"""

# Parse time_interval
start, end, clip_time = None, None, False
if time_interval is not None:
end = datetime.now(tz=MTL)
start = end - time_interval
clip_time = True

# Get data frame
df = load_job_series(start=start, end=end, clip_time=clip_time)

# Split data frame into time frames using `time_unit`
tf = compute_time_frames(df, frame_size=time_unit)

# List all available timestamps.
# We will check each timestamp for each cluster.
timestamps = sorted(tf["timestamp"].unique())

# List clusters
if cluster_names:
cluster_names = sorted(cluster_names)
else:
cluster_names = sorted(df["cluster_name"].unique())

# Iter for each cluster.
for cluster_name in cluster_names:
# Select only jobs for current cluster,
# group jobs by timestamp, and count jobs for each timestamp.
f_stats = (
tf[tf["cluster_name"] == cluster_name]
.groupby(["timestamp"])[["job_id"]]
.count()
)

# Create a dataframe with all available timestamps
# and associate each timestamp to 0 jobs by default.
c = (
pandas.DataFrame({"timestamp": timestamps, "count": [0] * len(timestamps)})
.groupby(["timestamp"])[["count"]]
.sum()
)
# Set each timestamp valid for this cluster with real number of jobs scraped in this timestamp.
c.loc[f_stats.index, "count"] = f_stats["job_id"]

# We now have number of jobs for each timestamp for this cluster,
# with count 0 for timestamps where no jobs run on cluster,

# Compute average number of jobs per timestamp for this cluster
avg = c["count"].mean()
# Compute standard deviation of job count per timestamp for this cluster
stddev = c["count"].std()
# Compute threshold to use for warnings: <average> - nb_stddev * <standard deviation>
threshold = max(0, avg - nb_stddev * stddev)

if verbose:
print(f"[{cluster_name}]", file=sys.stderr)
print(c, file=sys.stderr)
print(f"avg {avg}, stddev {stddev}, threshold {threshold}", file=sys.stderr)
print(file=sys.stderr)

if threshold == 0:
# If threshold is zero, no check can be done, as jobs count will be always >= 0.
# Instead, we log a general warning.
msg = f"[{cluster_name}] threshold 0 ({avg} - {nb_stddev} * {stddev})."
if len(timestamps) == 1:
msg += (
f" Only 1 timestamp found. Either time_interval ({time_interval}) is too short, "
f"or this cluster should not be currently checked"
)
else:
msg += (
f" Either nb_stddev is too high, time_interval ({time_interval}) is too short, "
f"or this cluster should not be currently checked"
)
logger.warning(msg)
else:
# With a non-null threshold, we can check each timestamp.
for timestamp in timestamps:
nb_jobs = c.loc[timestamp]["count"]
if nb_jobs < threshold:
logger.warning(
f"[{cluster_name}][{timestamp}] "
f"insufficient cluster scraping: {nb_jobs} jobs / cluster / time unit; "
f"minimum required for this cluster: {threshold} ({avg} - {nb_stddev} * {stddev}); "
f"time unit: {time_unit}"
)
77 changes: 77 additions & 0 deletions sarc/alerts/usage_alerts/gpu_util_per_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import logging
from datetime import datetime, timedelta
from typing import Optional

from sarc.config import MTL
from sarc.jobs.series import compute_cost_and_waste, load_job_series

logger = logging.getLogger(__name__)


def check_gpu_util_per_user(
threshold: timedelta,
time_interval: Optional[timedelta] = timedelta(days=7),
minimum_runtime: Optional[timedelta] = timedelta(minutes=5),
):
"""
Check if users have enough utilization of GPUs.
Log a warning for each user if average GPU-util of user jobs
in time interval is lower than a given threshold.
For a given user job, GPU-util is computed as
gpu_utilization * gpu_equivalent_cost
(with gpu_equivalent_cost as elapsed_time * allocated.gres_gpu).
Parameters
----------
threshold: timedelta
Minimum value for average GPU-util expected per user.
We assume GPU-util is expressed in GPU-seconds,
thus threshold can be expressed with a timedelta.
time_interval
If given, only jobs which ran in [now - time_interval, time_interval] will be used for checking.
Default is last 7 days.
If None, all jobs are used.
minimum_runtime
If given, only jobs which ran at least for this minimum runtime will be used for checking.
Default is 5 minutes.
If None, set to 0.
"""
# Parse time_interval
start, end, clip_time = None, None, False
if time_interval is not None:
end = datetime.now(tz=MTL)
start = end - time_interval
clip_time = True

# Get data frame. We clip time if start and end are available,
# so that minimum_runtime is compared to job running time in given interval.
df = load_job_series(start=start, end=end, clip_time=clip_time)

# Parse minimum_runtime, and select only jobs where
# elapsed time >= minimum runtime and allocated.gres_gpu > 0
if minimum_runtime is None:
minimum_runtime = timedelta(seconds=0)
df = df[
(df["elapsed_time"] >= minimum_runtime.total_seconds())
& (df["allocated.gres_gpu"] > 0)
]

# Compute cost
df = compute_cost_and_waste(df)

# Compute GPU-util for each job
df["gpu_util"] = df["gpu_utilization"] * df["gpu_equivalent_cost"]

# Compute average GPU-util per user
f_stats = df.groupby(["user"])[["gpu_util"]].mean()

# Now we can check
for row in f_stats.itertuples():
user = row.Index
gpu_util = row.gpu_util
if gpu_util < threshold.total_seconds():
logger.warning(
f"[{user}] insufficient average gpu_util: {gpu_util} GPU-seconds; "
f"minimum required: {threshold} ({threshold.total_seconds()} GPU-seconds)"
)
Loading

0 comments on commit 81d38a0

Please sign in to comment.