From e1fde0c4ad95ee1aecbd574d4df9f23492f8fc1e Mon Sep 17 00:00:00 2001 From: David Manthey Date: Wed, 22 May 2024 16:58:00 -0400 Subject: [PATCH] Better detect available memory in containers. Use cgroup information if proc has it available. --- CHANGELOG.md | 3 ++ examples/algorithm_progression.py | 4 +- .../rest/large_image_resource.py | 2 +- .../models/annotationelement.py | 2 +- large_image/cache_util/cachefactory.py | 14 +----- large_image/config.py | 47 +++++++++++++++++++ large_image/tilesource/base.py | 2 +- large_image/tilesource/utilities.py | 29 +----------- .../large_image_source_openjpeg/__init__.py | 2 +- .../large_image_converter/__init__.py | 5 +- 10 files changed, 62 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d67fb047..da42221bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## 1.28.3 +### Improvements +- Better detect available memory in containers ([#1532](../../pull/1532)) + ### Changes - Log more when saving annotations ([#1525](../../pull/1525)) - Thumbnail generation jobs are less blocking ([#1528](../../pull/1528), [#1530](../../pull/1530)) diff --git a/examples/algorithm_progression.py b/examples/algorithm_progression.py index 6444c5d33..578b32602 100755 --- a/examples/algorithm_progression.py +++ b/examples/algorithm_progression.py @@ -371,10 +371,10 @@ def main(argv): argparser = create_argparser() args = argparser.parse_args(argv[1:]) if args.num_workers < 1: - args.num_workers = large_image.tilesource.utilities.cpu_count(False) + args.num_workers = large_image.config.cpu_count(False) if os.environ.get('VIPS_CONCURRENCY') is None: os.environ['VIPS_CONCURRENCY'] = str(max( - 1, large_image.tilesource.utilities.cpu_count(False) // args.num_workers)) + 1, large_image.config.cpu_count(False) // args.num_workers)) if args.multiprocessing and os.environ.get('LARGE_IMAGE_CACHE_PYTHON_MEMORY_PORTION') is None: os.environ['LARGE_IMAGE_CACHE_PYTHON_MEMORY_PORTION'] = str(32 * args.num_workers) diff --git a/girder/girder_large_image/rest/large_image_resource.py b/girder/girder_large_image/rest/large_image_resource.py index e9d5faa44..ab544980f 100644 --- a/girder/girder_large_image/rest/large_image_resource.py +++ b/girder/girder_large_image/rest/large_image_resource.py @@ -142,7 +142,7 @@ def createThumbnailsJobThread(job): # noqa job, log='Started creating large image thumbnails\n', status=JobStatus.RUNNING) concurrency = int(job['kwargs'].get('concurrent', 0)) - concurrency = large_image.tilesource.utilities.cpu_count( + concurrency = large_image.config.cpu_count( logical=True) if concurrency < 1 else concurrency status = { 'checked': 0, diff --git a/girder_annotation/girder_large_image_annotation/models/annotationelement.py b/girder_annotation/girder_large_image_annotation/models/annotationelement.py index 26142d364..452a1290c 100644 --- a/girder_annotation/girder_large_image_annotation/models/annotationelement.py +++ b/girder_annotation/girder_large_image_annotation/models/annotationelement.py @@ -609,7 +609,7 @@ def updateElements(self, annotation): if not len(elements): return now = datetime.datetime.now(datetime.timezone.utc) - threads = large_image.tilesource.utilities.cpu_count() + threads = large_image.config.cpu_count() chunkSize = int(max(100000 // threads, 10000)) with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as pool: for chunk in range(0, len(elements), chunkSize): diff --git a/large_image/cache_util/cachefactory.py b/large_image/cache_util/cachefactory.py index 1174ab995..a369364f0 100644 --- a/large_image/cache_util/cachefactory.py +++ b/large_image/cache_util/cachefactory.py @@ -16,18 +16,11 @@ import math import threading +from importlib.metadata import entry_points from typing import Dict, Optional, Tuple, Type import cachetools -try: - import psutil - HAS_PSUTIL = True -except ImportError: - HAS_PSUTIL = False - -from importlib.metadata import entry_points - from .. import config from ..exceptions import TileCacheError from .memcache import MemCache @@ -92,10 +85,7 @@ def pickAvailableCache( if configMaxItems > 0: maxItems = configMaxItems # Estimate usage based on (1 / portion) of the total virtual memory. - if HAS_PSUTIL: - memory = psutil.virtual_memory().total - else: - memory = 1024 ** 3 + memory = config.total_memory() numItems = max(int(math.floor(memory / portion / sizeEach)), 2) if maxItems: numItems = min(numItems, maxItems) diff --git a/large_image/config.py b/large_image/config.py index 251fe403b..cbdaea090 100644 --- a/large_image/config.py +++ b/large_image/config.py @@ -132,3 +132,50 @@ def _ignoreSourceNames( return None if re.search(ignored_names, os.path.basename(path), flags=re.IGNORECASE): raise exceptions.TileSourceError('File will not be opened by %s reader' % configKey) + + +def cpu_count(logical: bool = True) -> int: + """ + Get the usable CPU count. If psutil is available, it is used, since it can + determine the number of physical CPUS versus logical CPUs. This returns + the smaller of that value from psutil and the number of cpus allowed by the + os scheduler, which means that for physical requests (logical=False), the + returned value may be more the the number of physical cpus that are usable. + + :param logical: True to get the logical usable CPUs (which include + hyperthreading). False for the physical usable CPUs. + :returns: the number of usable CPUs. + """ + count = os.cpu_count() or 2 + try: + count = min(count, len(os.sched_getaffinity(0))) + except AttributeError: + pass + try: + import psutil + + count = min(count, psutil.cpu_count(logical)) + except ImportError: + pass + return max(1, count) + + +def total_memory() -> int: + """ + Get the total memory in the system. If this is in a container, try to + determine the memory available to the cgroup. + + :returns: the available memory in bytes, or 8 GB if unknown. + """ + mem = 0 + if HAS_PSUTIL: + mem = psutil.virtual_memory().total + try: + cgroup = int(open('/sys/fs/cgroup/memory/memory.limit_in_bytes').read().strip()) + if 1024 ** 3 <= cgroup < 1024 ** 4 and (mem is None or cgroup < mem): + mem = cgroup + except Exception: + pass + if mem: + return mem + return 8 * 1024 ** 3 diff --git a/large_image/tilesource/base.py b/large_image/tilesource/base.py index 3fdc6a0d2..2bad507c1 100644 --- a/large_image/tilesource/base.py +++ b/large_image/tilesource/base.py @@ -1965,7 +1965,7 @@ def tileFrames( image = None tiledimage = None if max_workers is not None and max_workers < 0: - max_workers = min(-max_workers, utilities.cpu_count(False)) + max_workers = min(-max_workers, config.cpu_count(False)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as pool: futures = [] for idx, frame in enumerate(frameList): diff --git a/large_image/tilesource/utilities.py b/large_image/tilesource/utilities.py index b46c191aa..618c3b5a8 100644 --- a/large_image/tilesource/utilities.py +++ b/large_image/tilesource/utilities.py @@ -1,6 +1,5 @@ import io import math -import os import threading import types import xml.etree.ElementTree @@ -17,6 +16,8 @@ from ..constants import dtypeToGValue +# This was exposed here, once. + try: import simplejpeg except ImportError: @@ -1213,32 +1214,6 @@ def histogramThreshold(histogram: Dict[str, Any], threshold: float, fromMax: boo return result -def cpu_count(logical: bool = True) -> int: - """ - Get the usable CPU count. If psutil is available, it is used, since it can - determine the number of physical CPUS versus logical CPUs. This returns - the smaller of that value from psutil and the number of cpus allowed by the - os scheduler, which means that for physical requests (logical=False), the - returned value may be more the the number of physical cpus that are usable. - - :param logical: True to get the logical usable CPUs (which include - hyperthreading). False for the physical usable CPUs. - :returns: the number of usable CPUs. - """ - count = os.cpu_count() or 2 - try: - count = min(count, len(os.sched_getaffinity(0))) - except AttributeError: - pass - try: - import psutil - - count = min(count, psutil.cpu_count(logical)) - except ImportError: - pass - return max(1, count) - - def addPILFormatsToOutputOptions() -> None: """ Check PIL for available formats that be saved and add them to the lists of diff --git a/sources/openjpeg/large_image_source_openjpeg/__init__.py b/sources/openjpeg/large_image_source_openjpeg/__init__.py index 342fdbc84..1ba16af3b 100644 --- a/sources/openjpeg/large_image_source_openjpeg/__init__.py +++ b/sources/openjpeg/large_image_source_openjpeg/__init__.py @@ -104,7 +104,7 @@ def __init__(self, path, **kwargs): if not os.path.isfile(self._largeImagePath): raise TileSourceFileNotFoundError(self._largeImagePath) from None raise - glymur.set_option('lib.num_threads', large_image.tilesource.utilities.cpu_count()) + glymur.set_option('lib.num_threads', large_image.config.cpu_count()) self._openjpegHandles = queue.LifoQueue() for _ in range(self._maxOpenHandles - 1): self._openjpegHandles.put(None) diff --git a/utilities/converter/large_image_converter/__init__.py b/utilities/converter/large_image_converter/__init__.py index 3c811c63b..e125a90ad 100644 --- a/utilities/converter/large_image_converter/__init__.py +++ b/utilities/converter/large_image_converter/__init__.py @@ -14,7 +14,6 @@ from tempfile import TemporaryDirectory import numpy as np -import psutil import tifftools import large_image @@ -347,7 +346,7 @@ def _concurrency_to_value(_concurrency=None, **kwargs): _concurrency = int(_concurrency) if str(_concurrency).isdigit() else 0 if _concurrency > 0: return _concurrency - return max(1, large_image.tilesource.utilities.cpu_count(logical=True) + _concurrency) + return max(1, large_image.config.cpu_count(logical=True) + _concurrency) def _get_thread_pool(memoryLimit=None, **kwargs): @@ -359,7 +358,7 @@ def _get_thread_pool(memoryLimit=None, **kwargs): """ concurrency = _concurrency_to_value(**kwargs) if memoryLimit: - concurrency = min(concurrency, psutil.virtual_memory().total // memoryLimit) + concurrency = min(concurrency, large_image.config.total_memory() // memoryLimit) concurrency = max(1, concurrency) return concurrent.futures.ThreadPoolExecutor(max_workers=concurrency)