Skip to content

Commit

Permalink
Added dask memory limits
Browse files Browse the repository at this point in the history
  • Loading branch information
MitchellAV committed May 14, 2024
1 parent 018744c commit 1698aad
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 16 deletions.
3 changes: 2 additions & 1 deletion workers/src/logging_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
"stdout",
"file",
"json_file"
]
],
"propagate": false
}
}
}
12 changes: 4 additions & 8 deletions workers/src/pvinsight-validation-runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,9 +667,6 @@ def run_submission(
function_parameters: list[str],
row: pd.Series,
):

logger.info(f"file_name: {file_name}")

# Create master dictionary of all possible function kwargs
kwargs = prepare_kwargs_for_submission_function(
config_data, function_parameters, row, associated_metadata
Expand Down Expand Up @@ -722,9 +719,9 @@ def loop_over_files_and_generate_results(
results = dask_multiprocess(
run_submission_and_generate_performance_metrics,
func_arguments_list,
n_workers=2,
threads_per_worker=1,
memory_limit="16GiB",
# n_workers=2,
# threads_per_worker=1,
# memory_limit="16GiB",
logger=logger,
)
return results
Expand Down Expand Up @@ -801,7 +798,6 @@ def generate_performance_metrics_for_submission(
np.abs(output_dictionary[val] - ground_truth_dict[val])
)
results_dictionary[metric + "_" + val] = error
logger.info(f"results_dictionary: {results_dictionary}")
return results_dictionary


Expand All @@ -818,8 +814,8 @@ def run_submission_and_generate_performance_metrics(
performance_metrics: list[str],
file_number: int,
):
logger.info(f"{file_number} - running submission for file {file_name}")
try:
logger.info(f"{file_number} - running submission for file {file_name}")
# Get file_name, which will be pulled from database or S3 for
# each analysis
(
Expand Down
96 changes: 89 additions & 7 deletions workers/src/utility.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import sys
from dask.delayed import delayed
from dask.distributed import Client
from dask import config

from concurrent.futures import ProcessPoolExecutor, as_completed, thread
from functools import wraps
from logging import Logger
from time import perf_counter, sleep
from time import perf_counter, sleep, time
import os
from typing import Any, Callable, Tuple, TypeVar, Union
import logging
import boto3
import botocore.exceptions
from distributed import LocalCluster
from matplotlib.pylab import f
import psutil
import requests
import math


WORKER_ERROR_PREFIX = "wr"
Expand All @@ -27,9 +30,9 @@ def timing(verbose: bool = True, logger: Union[Logger, None] = None):
def decorator(func: Callable[..., T]):
@wraps(func)
def wrapper(*args, **kwargs) -> Tuple[T, float]:
start_time = perf_counter()
start_time = time()
result = func(*args, **kwargs)
end_time = perf_counter()
end_time = time()
execution_time = end_time - start_time
if verbose:
msg = (
Expand Down Expand Up @@ -66,8 +69,9 @@ def dask_multiprocess(
func_arguments: list[tuple[Any, ...]],
n_workers: int | None = None,
threads_per_worker: int | None = None,
memory_limit: str | float | int | None = None,
memory_limit: float | int | None = None,
logger: Logger | None = None,
**kwargs,
) -> T | list[T] | tuple[T, ...]:

# if n_workers is None:
Expand All @@ -85,10 +89,88 @@ def dask_multiprocess(
# if threads_per_worker is None:
# threads_per_worker = None

# if n_workers is None:
# n_workers = cpu_count
# if n_workers * n_processes > cpu_count:
# raise Exception(f"workers and threads exceed local resources, {cpu_count} cores present")
# if n_workers * memory_limit > sys_memory:
# config.set({'distributed.worker.memory.spill': True})
# print(f"Memory per worker exceeds system memory ({memory_limit} GB), activating memory spill\n")

memory_limit = memory_limit or 7.0

cpu_count = os.cpu_count()
# memory limit in GB
sys_memory = psutil.virtual_memory().total / (1024.0**3)

if cpu_count is None:
raise Exception("Could not determine number of CPUs.")

if n_workers is not None and threads_per_worker is not None:
if n_workers * threads_per_worker > cpu_count:
raise Exception(
f"workers and threads exceed local resources, {cpu_count} cores present"
)
if memory_limit * n_workers * threads_per_worker > sys_memory:
config.set({"distributed.worker.memory.spill": True})
print(
f"Memory per worker exceeds system memory ({memory_limit} GB), activating memory spill\n"
)

if n_workers is not None and threads_per_worker is None:
threads_per_worker = int(
math.floor(sys_memory / (memory_limit * n_workers))
)
if threads_per_worker == 0:
print(
"Not enough memory for a worker, defaulting to 1 thread per worker"
)
threads_per_worker = 1

if n_workers is None and threads_per_worker is not None:
n_workers = int(
math.floor(sys_memory / (memory_limit * threads_per_worker))
)
if n_workers == 0:
print("Not enough memory for a worker, defaulting to 1 worker")
n_workers = 1

if n_workers is None and threads_per_worker is None:
if memory_limit == 0:
raise Exception("Memory limit cannot be 0")
thread_worker_total = sys_memory / memory_limit
if thread_worker_total < 2:
print(
"Not enough memory for a worker, defaulting to 1 worker and 1 thread per worker"
)
n_workers = 1
threads_per_worker = 1
if memory_limit * n_workers > sys_memory:
config.set({"distributed.worker.memory.spill": True})
print(
f"Memory per worker exceeds system memory ({memory_limit} GB), activating memory spill\n"
)
else:
print(f"thread_worker_total: {thread_worker_total}")
n_workers = int(math.floor(thread_worker_total / 2))
threads_per_worker = int(math.floor(thread_worker_total / 2))

# config.set({"distributed.worker.memory.spill": True})
config.set({"distributed.worker.memory.pause": True})
config.set({"distributed.worker.memory.target": 0.95})
config.set({"distributed.worker.memory.terminate": False})

print(f"cpu count: {cpu_count}")
print(f"memory: {sys_memory}")
print(f"memory limit per worker: {memory_limit}")
print(f"n_workers: {n_workers}")
print(f"threads_per_worker: {threads_per_worker}")

client = Client(
n_workers=n_workers,
threads_per_worker=threads_per_worker,
memory_limit=memory_limit,
memory_limit=f"{memory_limit}GiB",
**kwargs,
)

# LocalCluster()
Expand All @@ -107,7 +189,7 @@ def dask_multiprocess(

lazy_results = []
for args in func_arguments:
lazy_result = delayed(func)(*args)
lazy_result = delayed(func, pure=True)(*args)
lazy_results.append(lazy_result)

futures = client.compute(lazy_results)
Expand Down

0 comments on commit 1698aad

Please sign in to comment.