Skip to content

Commit

Permalink
Fixup dir_dependencies to return all dependent PEX cache dirs as well.
Browse files Browse the repository at this point in the history
  • Loading branch information
jsirois committed Oct 5, 2024
1 parent 19efcdb commit bdb3f11
Showing 1 changed file with 89 additions and 33 deletions.
122 changes: 89 additions & 33 deletions pex/cache/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import absolute_import

import itertools
import os.path
import sqlite3
from contextlib import closing, contextmanager
Expand All @@ -11,6 +12,7 @@
from pex.cache.dirs import (
AtomicCacheDir,
BootstrapDir,
CacheDir,
InstalledWheelDir,
UnzipDir,
UserCodeDir,
Expand All @@ -19,10 +21,9 @@
from pex.common import CopyMode
from pex.dist_metadata import ProjectNameAndVersion
from pex.typing import TYPE_CHECKING, cast
from pex.variables import ENV

if TYPE_CHECKING:
from typing import Dict, Iterator, List, Optional, Union
from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union

from pex.pex_info import PexInfo

Expand Down Expand Up @@ -73,12 +74,12 @@
@contextmanager
def _db_connection():
# type: () -> Iterator[sqlite3.Connection]
db_dir = os.path.join(ENV.PEX_ROOT, "data")
db_dir = CacheDir.DBS.path("deps")
with atomic_directory(db_dir) as atomic_dir:
if not atomic_dir.is_finalized():
with sqlite3.connect(os.path.join(atomic_dir.work_dir, "cache.db")) as conn:
with sqlite3.connect(os.path.join(atomic_dir.work_dir, "deps.db")) as conn:
conn.executescript(_SCHEMA).close()
with sqlite3.connect(os.path.join(db_dir, "cache.db")) as conn:
with sqlite3.connect(os.path.join(db_dir, "deps.db")) as conn:
conn.executescript(
"""
PRAGMA synchronous=NORMAL;
Expand Down Expand Up @@ -196,13 +197,87 @@ def record_venv(coon_or_cursor):
record_venv(conn).close()


def zipapp_deps(pex_dir):
# type: (UnzipDir) -> Iterator[Union[BootstrapDir, UserCodeDir, str, InstalledWheelDir]]
def _iter_wheel_dependents(
dependent_pex, # type: Union[UnzipDir, VenvDir]
conn, # type: sqlite3.Connection
wheel_install_hashes, # type: Sequence[str]
):
# type: (...) -> Iterator[Union[str, AtomicCacheDir]]

# N.B.: Maximum parameter count is 999 in pre-2020 versions of SQLite 3; so we limit
# to an even lower chunk size to be safe: https://www.sqlite.org/limits.html
chunk_size = 250
for index in range(0, len(wheel_install_hashes), chunk_size):
chunk = wheel_install_hashes[index : index + chunk_size]
chunk_placeholder = ", ".join(itertools.repeat("?", len(chunk)))
with closing(
conn.execute(
"""
SELECT DISTINCT zipapps.pex_hash FROM zipapps
JOIN zipapp_deps ON zipapp_deps.pex_hash = zipapps.pex_hash
WHERE zipapp_deps.wheel_install_hash IN ({chunk})
""".format(
chunk=chunk_placeholder
),
chunk,
)
) as cursor:
for (pex_hash,) in cursor:
zipapp = UnzipDir.create(pex_hash)
if zipapp != dependent_pex:
yield zipapp

with closing(
conn.execute(
"""
SELECT DISTINCT venvs.short_hash, venvs.pex_hash, venvs.contents_hash FROM venvs
JOIN venv_deps ON venv_deps.venv_hash = venvs.short_hash
WHERE venv_deps.wheel_install_hash IN ({chunk})
""".format(
chunk=chunk_placeholder
),
chunk,
)
) as cursor:
for short_hash, pex_hash, contents_hash in cursor:
venv = VenvDir.create(pex_hash, contents_hash)
if venv != dependent_pex:
yield venv
yield VenvDir.short_path(short_hash, include_symlink=True)


def _iter_wheel_deps(
dependent_pex, # type: Union[UnzipDir, VenvDir]
conn, # type: sqlite3.Connection
wheels, # type: Iterator[Tuple[str, str, str]]
):
# type: (...) -> Iterator[Union[str, AtomicCacheDir]]

wheel_install_hashes = [] # type: List[str]
for wheel_name, wheel_install_hash, wheel_hash in wheels:
wheel_install_hashes.append(wheel_install_hash)
installed_wheel_dir = InstalledWheelDir.create(
wheel_name=wheel_name, wheel_hash=wheel_install_hash
)
if wheel_hash:
yield InstalledWheelDir.create(wheel_name=wheel_name, wheel_hash=wheel_hash)
yield installed_wheel_dir.path
else:
yield installed_wheel_dir

for dependent_dir in _iter_wheel_dependents(
dependent_pex=dependent_pex, conn=conn, wheel_install_hashes=wheel_install_hashes
):
yield dependent_dir


def zipapp_deps(unzip_dir):
# type: (UnzipDir) -> Iterator[Union[str, AtomicCacheDir]]
with _db_connection() as conn:
with closing(
conn.execute(
"SELECT bootstrap_hash, code_hash FROM zipapps WHERE pex_hash = ?",
[pex_dir.pex_hash],
[unzip_dir.pex_hash],
)
) as cursor:
bootstrap_hash, code_hash = cursor.fetchone()
Expand All @@ -217,22 +292,15 @@ def zipapp_deps(pex_dir):
JOIN zipapps ON zipapps.pex_hash = zipapp_deps.pex_hash
WHERE zipapps.pex_hash = ?
""",
[pex_dir.pex_hash],
[unzip_dir.pex_hash],
)
) as cursor:
for wheel_name, wheel_install_hash, wheel_hash in cursor:
installed_wheel_dir = InstalledWheelDir.create(
wheel_name=wheel_name, wheel_hash=wheel_install_hash
)
if wheel_hash:
yield InstalledWheelDir.create(wheel_name=wheel_name, wheel_hash=wheel_hash)
yield installed_wheel_dir.path
else:
yield installed_wheel_dir
for dep in _iter_wheel_deps(dependent_pex=unzip_dir, conn=conn, wheels=cursor):
yield dep


def venv_deps(venv_dir):
# type: (VenvDir) -> Iterator[Union[str, InstalledWheelDir]]
# type: (VenvDir) -> Iterator[Union[str, AtomicCacheDir]]
with _db_connection() as conn:
with closing(
conn.execute(
Expand All @@ -254,22 +322,10 @@ def venv_deps(venv_dir):
[short_hash],
)
) as cursor:
for wheel_name, wheel_install_hash, wheel_hash in cursor:
installed_wheel_dir = InstalledWheelDir.create(
wheel_name=wheel_name, wheel_hash=wheel_install_hash
)
if wheel_hash:
yield InstalledWheelDir.create(wheel_name=wheel_name, wheel_hash=wheel_hash)
yield installed_wheel_dir.path
else:
yield installed_wheel_dir
for dep in _iter_wheel_deps(dependent_pex=venv_dir, conn=conn, wheels=cursor):
yield dep


def dir_dependencies(pex_dir):
# type: (Union[UnzipDir, VenvDir]) -> Iterator[Union[str, AtomicCacheDir]]
return zipapp_deps(pex_dir) if isinstance(pex_dir, UnzipDir) else venv_deps(pex_dir)


def delete(pex_dir):
# type: (Union[UnzipDir, VenvDir]) -> None
pass

0 comments on commit bdb3f11

Please sign in to comment.