From 68482008c4ee68b9bf52422a27a5c7c63ae0ca4c Mon Sep 17 00:00:00 2001 From: John Sirois Date: Fri, 11 Oct 2024 11:01:59 -0700 Subject: [PATCH] Plumb Pip cache pruning based on pruned wheels. --- pex/cache/dirs.py | 49 ++++++++++++ pex/cli/commands/cache/command.py | 120 ++++++++++++++++++++++++++---- pex/pip/installation.py | 52 +++++++++---- pex/pip/tool.py | 22 +++++- pex/venv/venv_pex.py | 2 + 5 files changed, 215 insertions(+), 30 deletions(-) diff --git a/pex/cache/dirs.py b/pex/cache/dirs.py index df428cc04..da04d1e28 100644 --- a/pex/cache/dirs.py +++ b/pex/cache/dirs.py @@ -13,6 +13,8 @@ if TYPE_CHECKING: from typing import Any, Iterable, Iterator, List, Optional, Type, TypeVar, Union + from pex.pip.version import PipVersionValue + class CacheDir(Enum["CacheDir.Value"]): class Value(Enum.Value): @@ -344,6 +346,7 @@ def short_dir(self): class InstalledWheelDir(AtomicCacheDir): @classmethod def iter_all(cls, pex_root=ENV): + # type: (Union[str, Variables]) -> Iterator[InstalledWheelDir] symlinks = [] # type: List[str] dirs = [] # type: List[str] for path in glob.glob(CacheDir.INSTALLED_WHEELS.path("*", "*.whl", pex_root=pex_root)): @@ -462,3 +465,49 @@ def __init__( # type: (...) -> None super(UserCodeDir, self).__init__(path) self.code_hash = code_hash + + +class PipPexDir(AtomicCacheDir): + @classmethod + def iter_all(cls, pex_root=ENV): + # type: (Union[str, Variables]) -> Iterator[PipPexDir] + + from pex.pip.version import PipVersion + + for version_dir in glob.glob(CacheDir.PIP.path("*", pex_root=pex_root)): + version = PipVersion.for_value(os.path.basename(version_dir)) + cache_dir = os.path.join(version_dir, "pip_cache") + for pex_dir in glob.glob(os.path.join(version_dir, "pip.pex", "*", "*")): + yield cls( + path=pex_dir, + version=version, + cache_dir=cache_dir, + ) + + @classmethod + def create( + cls, + version, # type: PipVersionValue + fingerprint, # type: str + ): + # type: (...) -> PipPexDir + + from pex.third_party import isolated + + base_dir = CacheDir.PIP.path(str(version)) + return cls( + path=os.path.join(base_dir, "pip.pex", isolated().pex_hash, fingerprint), + version=version, + cache_dir=os.path.join(base_dir, "pip_cache"), + ) + + def __init__( + self, + path, # type: str + version, # type: PipVersionValue + cache_dir, # type: str + ): + # type: (...) -> None + super(PipPexDir, self).__init__(path) + self.version = version + self.cache_dir = cache_dir diff --git a/pex/cli/commands/cache/command.py b/pex/cli/commands/cache/command.py index 45dbfda0c..bdb6d5a42 100644 --- a/pex/cli/commands/cache/command.py +++ b/pex/cli/commands/cache/command.py @@ -6,6 +6,7 @@ import os import re from argparse import Action, ArgumentError, _ActionsContainer +from collections import Counter from datetime import datetime, timedelta from pex.cache import access as cache_access @@ -16,14 +17,18 @@ from pex.cli.commands.cache.du import DiskUsage from pex.commands.command import OutputMixin from pex.common import pluralize, safe_rmtree +from pex.dist_metadata import ProjectNameAndVersion from pex.exceptions import reportable_unexpected_error_msg -from pex.jobs import iter_map_parallel, map_parallel +from pex.jobs import SpawnedJob, execute_parallel, iter_map_parallel, map_parallel from pex.orderedset import OrderedSet +from pex.pip.installation import iter_all as iter_all_pips +from pex.pip.tool import Pip from pex.result import Error, Ok, Result from pex.typing import TYPE_CHECKING from pex.variables import ENV if TYPE_CHECKING: + import typing from typing import IO, Dict, Iterable, List, Optional, Tuple, Union import attr # vendor:skip @@ -492,10 +497,14 @@ def _prune(self): print(file=fp) def prune_unused_deps(additional=False): - with cache_data.prune(list(InstalledWheelDir.iter_all())) as unused_deps_iter: - disk_usages = list( + # type: (bool) -> Iterable[InstalledWheelDir] + with cache_data.prune(tuple(InstalledWheelDir.iter_all())) as unused_deps_iter: + unused_wheels = tuple( + dep for dep in unused_deps_iter if isinstance(dep, InstalledWheelDir) + ) + disk_usages = tuple( iter_map_parallel( - unused_deps_iter, + unused_wheels, self._prune_cache_dir, noun="cached PEX dependency", verb="prune", @@ -513,9 +522,88 @@ def prune_unused_deps(additional=False): ) print(self._render_usage(disk_usages)) print(file=fp) + return unused_wheels + + def prune_pip_caches(wheels): + # type: (Iterable[InstalledWheelDir]) -> None + + prunable_wheels = set() + for wheel in wheels: + prunable_pnav = ProjectNameAndVersion.from_filename(wheel.wheel_name) + prunable_wheels.add( + (prunable_pnav.canonicalized_project_name, prunable_pnav.canonicalized_version) + ) + + def spawn_list(pip): + # type: (Pip) -> SpawnedJob[Tuple[ProjectNameAndVersion, ...]] + return SpawnedJob.stdout( + job=pip.spawn_cache_list(), + result_func=lambda stdout: tuple( + ProjectNameAndVersion.from_filename(wheel_file) + for wheel_file in stdout.decode("utf-8").splitlines() + if wheel_file + ), + ) + + all_pips = tuple(iter_all_pips()) + pip_removes = [] # type: List[Tuple[Pip, str]] + for pip, project_name_and_versions in zip( + all_pips, execute_parallel(inputs=all_pips, spawn_func=spawn_list) + ): + for pnav in project_name_and_versions: + if ( + pnav.canonicalized_project_name, + pnav.canonicalized_version, + ) in prunable_wheels: + pip_removes.append( + ( + pip, + "{project_name}-{version}*".format( + project_name=pnav.project_name, version=pnav.version + ), + ) + ) + + def spawn_remove(args): + # type: (Tuple[Pip, str]) -> SpawnedJob[int] + pip, wheel_name_glob = args + # Files removed: 1 + return SpawnedJob.stdout( + job=pip.spawn_cache_remove(wheel_name_glob), + result_func=lambda stdout: int( + stdout.decode("utf-8").rsplit(":", 1)[1].strip() + ), + ) + + removes_by_pip = Counter() # type: typing.Counter[str] + for pip, remove_count in zip( + [pip for pip, _ in pip_removes], + execute_parallel(inputs=pip_removes, spawn_func=spawn_remove), + ): + removes_by_pip[pip.version.value] += remove_count + if removes_by_pip: + total = sum(removes_by_pip.values()) + print( + "Pruned {total} cached {wheels} from {count} Pip {version}:".format( + total=total, + wheels=pluralize(total, "wheel"), + count=len(removes_by_pip), + version=pluralize(removes_by_pip, "version"), + ), + file=fp, + ) + for pip_version, remove_count in sorted(removes_by_pip.items()): + print( + "Pip {version}: removed {remove_count} {wheels}".format( + version=pip_version, + remove_count=remove_count, + wheels=pluralize(remove_count, "wheel"), + ), + file=fp, + ) cutoff = self.options.cutoff - pex_dirs = list(cache_access.last_access_before(cutoff.cutoff)) + pex_dirs = tuple(cache_access.last_access_before(cutoff.cutoff)) if not pex_dirs: print( "There are no cached PEX zipapps or venvs last accessed prior to {cutoff}.".format( @@ -528,7 +616,8 @@ def prune_unused_deps(additional=False): file=fp, ) print(file=fp) - prune_unused_deps() + unused_wheels = prune_unused_deps() + prune_pip_caches(unused_wheels) return Ok() print( @@ -541,7 +630,7 @@ def prune_unused_deps(additional=False): ) print( self._render_usage( - list( + tuple( iter_map_parallel( pex_dirs, self._prune_cache_dir, @@ -556,7 +645,7 @@ def prune_unused_deps(additional=False): print(file=fp) if self.options.dry_run: - deps = list(cache_data.dir_dependencies(pex_dirs)) + deps = tuple(cache_data.dir_dependencies(pex_dirs)) print( "Might have pruned up to {count} {cached_pex_dependency}.".format( count=len(deps), cached_pex_dependency=pluralize(deps, "cached PEX dependency") @@ -565,7 +654,7 @@ def prune_unused_deps(additional=False): ) print( self._render_usage( - list( + tuple( iter_map_parallel( deps, self._prune_cache_dir, @@ -579,11 +668,15 @@ def prune_unused_deps(additional=False): print(file=fp) else: with cache_data.delete(pex_dirs) as deps_iter: - deps = list(deps_iter) + deps = tuple(deps_iter) with cache_data.prune(deps) as prunable_deps_iter: - disk_usages = list( + unused_deps = tuple(prunable_deps_iter) + unused_wheels = OrderedSet( + dep for dep in unused_deps if isinstance(dep, InstalledWheelDir) + ) + disk_usages = tuple( iter_map_parallel( - prunable_deps_iter, + unused_deps, self._prune_cache_dir, noun="cached PEX dependency", verb="prune", @@ -613,5 +706,6 @@ def prune_unused_deps(additional=False): if disk_usages: print(self._render_usage(disk_usages)) print(file=fp) - prune_unused_deps(additional=len(disk_usages) > 0) + unused_wheels.update(prune_unused_deps(additional=len(disk_usages) > 0)) + prune_pip_caches(unused_wheels) return Ok() diff --git a/pex/pip/installation.py b/pex/pip/installation.py index d9ce87118..561ee6556 100644 --- a/pex/pip/installation.py +++ b/pex/pip/installation.py @@ -10,9 +10,10 @@ from pex import pex_warnings, third_party from pex.atomic_directory import atomic_directory -from pex.cache.dirs import CacheDir +from pex.cache.dirs import PipPexDir from pex.common import REPRODUCIBLE_BUILDS_ENV, pluralize, safe_mkdtemp from pex.dist_metadata import Requirement +from pex.exceptions import production_assert from pex.interpreter import PythonInterpreter from pex.orderedset import OrderedSet from pex.pep_503 import ProjectName @@ -23,10 +24,10 @@ from pex.resolve.resolvers import Resolver from pex.result import Error, try_ from pex.targets import LocalInterpreter, RequiresPythonError, Targets -from pex.third_party import isolated from pex.tracer import TRACER from pex.typing import TYPE_CHECKING from pex.util import named_temporary_file +from pex.variables import ENV, Variables from pex.venv.virtualenv import InstallationChoice, Virtualenv if TYPE_CHECKING: @@ -37,6 +38,25 @@ from pex.third_party import attr +def _create_pip( + pip_pex, # type: PipPexDir + interpreter=None, # type: Optional[PythonInterpreter] + use_system_time=False, # type: bool +): + # type: (...) -> Pip + + production_assert(os.path.exists(pip_pex.path)) + + pip_interpreter = interpreter or PythonInterpreter.get() + venv_pex = ensure_venv(PEX(pip_pex.path, interpreter=pip_interpreter)) + pip_venv = PipVenv( + venv_dir=venv_pex.venv_dir, + execute_env=REPRODUCIBLE_BUILDS_ENV if not use_system_time else {}, + execute_args=tuple(venv_pex.execute_args()), + ) + return Pip(pip=pip_venv, version=pip_pex.version, pip_cache=pip_pex.cache_dir) + + def _pip_installation( version, # type: PipVersionValue iter_distribution_locations, # type: Callable[[], Iterator[str]] @@ -45,11 +65,9 @@ def _pip_installation( use_system_time=False, # type: bool ): # type: (...) -> Pip - pip_root = CacheDir.PIP.path(str(version)) - path = os.path.join(pip_root, "pip.pex") - pip_interpreter = interpreter or PythonInterpreter.get() - pip_pex_path = os.path.join(path, isolated().pex_hash, fingerprint) - with atomic_directory(pip_pex_path) as chroot: + + pip_pex = PipPexDir.create(version, fingerprint) + with atomic_directory(pip_pex.path) as chroot: if not chroot.is_finalized(): from pex.pex_builder import PEXBuilder @@ -80,14 +98,7 @@ def _pip_installation( fp.close() isolated_pip_builder.set_executable(fp.name, "__pex_patched_pip__.py") isolated_pip_builder.freeze() - pip_cache = os.path.join(pip_root, "pip_cache") - pip_pex = ensure_venv(PEX(pip_pex_path, interpreter=pip_interpreter)) - pip_venv = PipVenv( - venv_dir=pip_pex.venv_dir, - execute_env=REPRODUCIBLE_BUILDS_ENV if not use_system_time else {}, - execute_args=tuple(pip_pex.execute_args()), - ) - return Pip(pip=pip_venv, version=version, pip_cache=pip_cache) + return _create_pip(pip_pex, interpreter=interpreter, use_system_time=use_system_time) def _fingerprint(requirements): @@ -433,3 +444,14 @@ def get_pip( ) _PIP[installation] = pip return pip + + +def iter_all( + interpreter=None, # type: Optional[PythonInterpreter] + use_system_time=False, # type: bool + pex_root=ENV, # type: Union[str, Variables] +): + # type: (...) -> Iterator[Pip] + + for pip_pex in PipPexDir.iter_all(pex_root=pex_root): + yield _create_pip(pip_pex, interpreter=interpreter, use_system_time=use_system_time) diff --git a/pex/pip/tool.py b/pex/pip/tool.py index 25de49db9..61d6d556d 100644 --- a/pex/pip/tool.py +++ b/pex/pip/tool.py @@ -272,8 +272,8 @@ def analyze(self, line): @attr.s(frozen=True) class PipVenv(object): venv_dir = attr.ib() # type: str - execute_env = attr.ib() # type: Mapping[str, str] - _execute_args = attr.ib() # type: Tuple[str, ...] + execute_env = attr.ib(factory=dict) # type: Mapping[str, str] + _execute_args = attr.ib(default=()) # type: Tuple[str, ...] def execute_args(self, *args): # type: (*str) -> List[str] @@ -721,3 +721,21 @@ def spawn_debug( return self._spawn_pip_isolated_job( debug_command, log=log, pip_verbosity=1, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) + + def spawn_cache_remove(self, wheel_name_glob): + # type: (str) -> Job + return self._spawn_pip_isolated_job( + args=["cache", "remove", wheel_name_glob], + pip_verbosity=1, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + def spawn_cache_list(self): + # type: () -> Job + return self._spawn_pip_isolated_job( + args=["cache", "list", "--format", "abspath"], + pip_verbosity=1, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) diff --git a/pex/venv/venv_pex.py b/pex/venv/venv_pex.py index 787f941e2..daed27e8e 100644 --- a/pex/venv/venv_pex.py +++ b/pex/venv/venv_pex.py @@ -169,6 +169,8 @@ def maybe_log(*message): "_PEX_SCIE_INSTALLED_PEX_DIR", # This is used to override PBS distribution URLs in lazy PEX scies. "PEX_BOOTSTRAP_URLS", + # This is used to support `pex3 cache {prune,purge}`. + "_PEX_CACHE_ACCESS_LOCK", ) ] if ignored_pex_env_vars: