Skip to content

Commit

Permalink
Plumb Pip cache pruning based on pruned wheels.
Browse files Browse the repository at this point in the history
  • Loading branch information
jsirois committed Oct 11, 2024
1 parent 0f97d5f commit 6848200
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 30 deletions.
49 changes: 49 additions & 0 deletions pex/cache/dirs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
if TYPE_CHECKING:
from typing import Any, Iterable, Iterator, List, Optional, Type, TypeVar, Union

from pex.pip.version import PipVersionValue


class CacheDir(Enum["CacheDir.Value"]):
class Value(Enum.Value):
Expand Down Expand Up @@ -344,6 +346,7 @@ def short_dir(self):
class InstalledWheelDir(AtomicCacheDir):
@classmethod
def iter_all(cls, pex_root=ENV):
# type: (Union[str, Variables]) -> Iterator[InstalledWheelDir]
symlinks = [] # type: List[str]
dirs = [] # type: List[str]
for path in glob.glob(CacheDir.INSTALLED_WHEELS.path("*", "*.whl", pex_root=pex_root)):
Expand Down Expand Up @@ -462,3 +465,49 @@ def __init__(
# type: (...) -> None
super(UserCodeDir, self).__init__(path)
self.code_hash = code_hash


class PipPexDir(AtomicCacheDir):
@classmethod
def iter_all(cls, pex_root=ENV):
# type: (Union[str, Variables]) -> Iterator[PipPexDir]

from pex.pip.version import PipVersion

for version_dir in glob.glob(CacheDir.PIP.path("*", pex_root=pex_root)):
version = PipVersion.for_value(os.path.basename(version_dir))
cache_dir = os.path.join(version_dir, "pip_cache")
for pex_dir in glob.glob(os.path.join(version_dir, "pip.pex", "*", "*")):
yield cls(
path=pex_dir,
version=version,
cache_dir=cache_dir,
)

@classmethod
def create(
cls,
version, # type: PipVersionValue
fingerprint, # type: str
):
# type: (...) -> PipPexDir

from pex.third_party import isolated

base_dir = CacheDir.PIP.path(str(version))
return cls(
path=os.path.join(base_dir, "pip.pex", isolated().pex_hash, fingerprint),
version=version,
cache_dir=os.path.join(base_dir, "pip_cache"),
)

def __init__(
self,
path, # type: str
version, # type: PipVersionValue
cache_dir, # type: str
):
# type: (...) -> None
super(PipPexDir, self).__init__(path)
self.version = version
self.cache_dir = cache_dir
120 changes: 107 additions & 13 deletions pex/cli/commands/cache/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import re
from argparse import Action, ArgumentError, _ActionsContainer
from collections import Counter
from datetime import datetime, timedelta

from pex.cache import access as cache_access
Expand All @@ -16,14 +17,18 @@
from pex.cli.commands.cache.du import DiskUsage
from pex.commands.command import OutputMixin
from pex.common import pluralize, safe_rmtree
from pex.dist_metadata import ProjectNameAndVersion
from pex.exceptions import reportable_unexpected_error_msg
from pex.jobs import iter_map_parallel, map_parallel
from pex.jobs import SpawnedJob, execute_parallel, iter_map_parallel, map_parallel
from pex.orderedset import OrderedSet
from pex.pip.installation import iter_all as iter_all_pips
from pex.pip.tool import Pip
from pex.result import Error, Ok, Result
from pex.typing import TYPE_CHECKING
from pex.variables import ENV

if TYPE_CHECKING:
import typing
from typing import IO, Dict, Iterable, List, Optional, Tuple, Union

import attr # vendor:skip
Expand Down Expand Up @@ -492,10 +497,14 @@ def _prune(self):
print(file=fp)

def prune_unused_deps(additional=False):
with cache_data.prune(list(InstalledWheelDir.iter_all())) as unused_deps_iter:
disk_usages = list(
# type: (bool) -> Iterable[InstalledWheelDir]
with cache_data.prune(tuple(InstalledWheelDir.iter_all())) as unused_deps_iter:
unused_wheels = tuple(
dep for dep in unused_deps_iter if isinstance(dep, InstalledWheelDir)
)
disk_usages = tuple(
iter_map_parallel(
unused_deps_iter,
unused_wheels,
self._prune_cache_dir,
noun="cached PEX dependency",
verb="prune",
Expand All @@ -513,9 +522,88 @@ def prune_unused_deps(additional=False):
)
print(self._render_usage(disk_usages))
print(file=fp)
return unused_wheels

def prune_pip_caches(wheels):
# type: (Iterable[InstalledWheelDir]) -> None

prunable_wheels = set()
for wheel in wheels:
prunable_pnav = ProjectNameAndVersion.from_filename(wheel.wheel_name)
prunable_wheels.add(
(prunable_pnav.canonicalized_project_name, prunable_pnav.canonicalized_version)
)

def spawn_list(pip):
# type: (Pip) -> SpawnedJob[Tuple[ProjectNameAndVersion, ...]]
return SpawnedJob.stdout(
job=pip.spawn_cache_list(),
result_func=lambda stdout: tuple(
ProjectNameAndVersion.from_filename(wheel_file)
for wheel_file in stdout.decode("utf-8").splitlines()
if wheel_file
),
)

all_pips = tuple(iter_all_pips())
pip_removes = [] # type: List[Tuple[Pip, str]]
for pip, project_name_and_versions in zip(
all_pips, execute_parallel(inputs=all_pips, spawn_func=spawn_list)
):
for pnav in project_name_and_versions:
if (
pnav.canonicalized_project_name,
pnav.canonicalized_version,
) in prunable_wheels:
pip_removes.append(
(
pip,
"{project_name}-{version}*".format(
project_name=pnav.project_name, version=pnav.version
),
)
)

def spawn_remove(args):
# type: (Tuple[Pip, str]) -> SpawnedJob[int]
pip, wheel_name_glob = args
# Files removed: 1
return SpawnedJob.stdout(
job=pip.spawn_cache_remove(wheel_name_glob),
result_func=lambda stdout: int(
stdout.decode("utf-8").rsplit(":", 1)[1].strip()
),
)

removes_by_pip = Counter() # type: typing.Counter[str]
for pip, remove_count in zip(
[pip for pip, _ in pip_removes],
execute_parallel(inputs=pip_removes, spawn_func=spawn_remove),
):
removes_by_pip[pip.version.value] += remove_count
if removes_by_pip:
total = sum(removes_by_pip.values())
print(
"Pruned {total} cached {wheels} from {count} Pip {version}:".format(
total=total,
wheels=pluralize(total, "wheel"),
count=len(removes_by_pip),
version=pluralize(removes_by_pip, "version"),
),
file=fp,
)
for pip_version, remove_count in sorted(removes_by_pip.items()):
print(
"Pip {version}: removed {remove_count} {wheels}".format(
version=pip_version,
remove_count=remove_count,
wheels=pluralize(remove_count, "wheel"),
),
file=fp,
)

cutoff = self.options.cutoff
pex_dirs = list(cache_access.last_access_before(cutoff.cutoff))
pex_dirs = tuple(cache_access.last_access_before(cutoff.cutoff))
if not pex_dirs:
print(
"There are no cached PEX zipapps or venvs last accessed prior to {cutoff}.".format(
Expand All @@ -528,7 +616,8 @@ def prune_unused_deps(additional=False):
file=fp,
)
print(file=fp)
prune_unused_deps()
unused_wheels = prune_unused_deps()
prune_pip_caches(unused_wheels)
return Ok()

print(
Expand All @@ -541,7 +630,7 @@ def prune_unused_deps(additional=False):
)
print(
self._render_usage(
list(
tuple(
iter_map_parallel(
pex_dirs,
self._prune_cache_dir,
Expand All @@ -556,7 +645,7 @@ def prune_unused_deps(additional=False):
print(file=fp)

if self.options.dry_run:
deps = list(cache_data.dir_dependencies(pex_dirs))
deps = tuple(cache_data.dir_dependencies(pex_dirs))
print(
"Might have pruned up to {count} {cached_pex_dependency}.".format(
count=len(deps), cached_pex_dependency=pluralize(deps, "cached PEX dependency")
Expand All @@ -565,7 +654,7 @@ def prune_unused_deps(additional=False):
)
print(
self._render_usage(
list(
tuple(
iter_map_parallel(
deps,
self._prune_cache_dir,
Expand All @@ -579,11 +668,15 @@ def prune_unused_deps(additional=False):
print(file=fp)
else:
with cache_data.delete(pex_dirs) as deps_iter:
deps = list(deps_iter)
deps = tuple(deps_iter)
with cache_data.prune(deps) as prunable_deps_iter:
disk_usages = list(
unused_deps = tuple(prunable_deps_iter)
unused_wheels = OrderedSet(
dep for dep in unused_deps if isinstance(dep, InstalledWheelDir)
)
disk_usages = tuple(
iter_map_parallel(
prunable_deps_iter,
unused_deps,
self._prune_cache_dir,
noun="cached PEX dependency",
verb="prune",
Expand Down Expand Up @@ -613,5 +706,6 @@ def prune_unused_deps(additional=False):
if disk_usages:
print(self._render_usage(disk_usages))
print(file=fp)
prune_unused_deps(additional=len(disk_usages) > 0)
unused_wheels.update(prune_unused_deps(additional=len(disk_usages) > 0))
prune_pip_caches(unused_wheels)
return Ok()
52 changes: 37 additions & 15 deletions pex/pip/installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@

from pex import pex_warnings, third_party
from pex.atomic_directory import atomic_directory
from pex.cache.dirs import CacheDir
from pex.cache.dirs import PipPexDir
from pex.common import REPRODUCIBLE_BUILDS_ENV, pluralize, safe_mkdtemp
from pex.dist_metadata import Requirement
from pex.exceptions import production_assert
from pex.interpreter import PythonInterpreter
from pex.orderedset import OrderedSet
from pex.pep_503 import ProjectName
Expand All @@ -23,10 +24,10 @@
from pex.resolve.resolvers import Resolver
from pex.result import Error, try_
from pex.targets import LocalInterpreter, RequiresPythonError, Targets
from pex.third_party import isolated
from pex.tracer import TRACER
from pex.typing import TYPE_CHECKING
from pex.util import named_temporary_file
from pex.variables import ENV, Variables
from pex.venv.virtualenv import InstallationChoice, Virtualenv

if TYPE_CHECKING:
Expand All @@ -37,6 +38,25 @@
from pex.third_party import attr


def _create_pip(
pip_pex, # type: PipPexDir
interpreter=None, # type: Optional[PythonInterpreter]
use_system_time=False, # type: bool
):
# type: (...) -> Pip

production_assert(os.path.exists(pip_pex.path))

pip_interpreter = interpreter or PythonInterpreter.get()
venv_pex = ensure_venv(PEX(pip_pex.path, interpreter=pip_interpreter))
pip_venv = PipVenv(
venv_dir=venv_pex.venv_dir,
execute_env=REPRODUCIBLE_BUILDS_ENV if not use_system_time else {},
execute_args=tuple(venv_pex.execute_args()),
)
return Pip(pip=pip_venv, version=pip_pex.version, pip_cache=pip_pex.cache_dir)


def _pip_installation(
version, # type: PipVersionValue
iter_distribution_locations, # type: Callable[[], Iterator[str]]
Expand All @@ -45,11 +65,9 @@ def _pip_installation(
use_system_time=False, # type: bool
):
# type: (...) -> Pip
pip_root = CacheDir.PIP.path(str(version))
path = os.path.join(pip_root, "pip.pex")
pip_interpreter = interpreter or PythonInterpreter.get()
pip_pex_path = os.path.join(path, isolated().pex_hash, fingerprint)
with atomic_directory(pip_pex_path) as chroot:

pip_pex = PipPexDir.create(version, fingerprint)
with atomic_directory(pip_pex.path) as chroot:
if not chroot.is_finalized():
from pex.pex_builder import PEXBuilder

Expand Down Expand Up @@ -80,14 +98,7 @@ def _pip_installation(
fp.close()
isolated_pip_builder.set_executable(fp.name, "__pex_patched_pip__.py")
isolated_pip_builder.freeze()
pip_cache = os.path.join(pip_root, "pip_cache")
pip_pex = ensure_venv(PEX(pip_pex_path, interpreter=pip_interpreter))
pip_venv = PipVenv(
venv_dir=pip_pex.venv_dir,
execute_env=REPRODUCIBLE_BUILDS_ENV if not use_system_time else {},
execute_args=tuple(pip_pex.execute_args()),
)
return Pip(pip=pip_venv, version=version, pip_cache=pip_cache)
return _create_pip(pip_pex, interpreter=interpreter, use_system_time=use_system_time)


def _fingerprint(requirements):
Expand Down Expand Up @@ -433,3 +444,14 @@ def get_pip(
)
_PIP[installation] = pip
return pip


def iter_all(
interpreter=None, # type: Optional[PythonInterpreter]
use_system_time=False, # type: bool
pex_root=ENV, # type: Union[str, Variables]
):
# type: (...) -> Iterator[Pip]

for pip_pex in PipPexDir.iter_all(pex_root=pex_root):
yield _create_pip(pip_pex, interpreter=interpreter, use_system_time=use_system_time)
22 changes: 20 additions & 2 deletions pex/pip/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ def analyze(self, line):
@attr.s(frozen=True)
class PipVenv(object):
venv_dir = attr.ib() # type: str
execute_env = attr.ib() # type: Mapping[str, str]
_execute_args = attr.ib() # type: Tuple[str, ...]
execute_env = attr.ib(factory=dict) # type: Mapping[str, str]
_execute_args = attr.ib(default=()) # type: Tuple[str, ...]

def execute_args(self, *args):
# type: (*str) -> List[str]
Expand Down Expand Up @@ -721,3 +721,21 @@ def spawn_debug(
return self._spawn_pip_isolated_job(
debug_command, log=log, pip_verbosity=1, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

def spawn_cache_remove(self, wheel_name_glob):
# type: (str) -> Job
return self._spawn_pip_isolated_job(
args=["cache", "remove", wheel_name_glob],
pip_verbosity=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

def spawn_cache_list(self):
# type: () -> Job
return self._spawn_pip_isolated_job(
args=["cache", "list", "--format", "abspath"],
pip_verbosity=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
Loading

0 comments on commit 6848200

Please sign in to comment.