From e6ea967d46c8b3eb9b7f958908655b8eb9345043 Mon Sep 17 00:00:00 2001 From: Danny McClanahan <1305167+cosmicexplorer@users.noreply.github.com> Date: Thu, 6 Jul 2023 18:46:44 -0400 Subject: [PATCH] squashed commit add change to do parallel zipping only, no crawling modify cli arg format for medusa-zip tool update cli arg format fix non-exhaustive CopyMode usage [BROKEN] add first run of complete medusa zip with cli arg! the resulting zip cannot be zipimported yet.... medusa zipping works great now, let's revert .zip() changes bump medusa options bump more medusa options use the merged medusa command lines now manage a cache of parallel intermediate zip generation jobs! small fix much closer to mergeable now working much more complex control flow between the medusa-zip cli move medusa zip to medusa.py medusa works for packed apps now too works for everything, but kind of crazy close stdin after writing to the child process factor out a ridiculous amount of boilerplate add back the non-medusa impl for packed layouts implement a "normal" version which uses atomic directories revert unintentional whitespace changes separate the serial and parallel pex creations remove the attempts at parallelism add --medusa-path remove unused code make the medusa hook work when not provided add back a tracer revert some changes that make things harder to read revert some changes i shouldn't need make medusa work with the medusa-zip package and not subprocesses! update after adding defaults in medusa-zip python package remove -f arg for resolving medusa-zip [BROKEN] possibly obsolete! fix cli arg add stitched layout create stitch copymode no initial stitch impl --- pex/common.py | 7 +- pex/pex_builder.py | 164 +++++++++++++++++++++++++++++++++++++++------ pex/ziputils.py | 38 ++++++++++- 3 files changed, 184 insertions(+), 25 deletions(-) diff --git a/pex/common.py b/pex/common.py index a4bada46a..e3e2bd601 100644 --- a/pex/common.py +++ b/pex/common.py @@ -25,6 +25,7 @@ if TYPE_CHECKING: from typing import ( Any, + BinaryIO, Callable, DefaultDict, Iterable, @@ -34,6 +35,7 @@ Set, Sized, Tuple, + Union, ) # We use the start of MS-DOS time, which is what zipfiles use (see section 4.4.6 of @@ -220,6 +222,7 @@ def _chmod(self, info, path): @contextlib.contextmanager def open_zip(path, *args, **kwargs): + # type: (Union[str, BinaryIO], Any, Any) -> Iterator[PermPreservingZipFile] """A contextmanager for zip files. Passes through positional and kwargs to zipfile.ZipFile. @@ -585,7 +588,7 @@ def delete(self): def zip( self, - filename, # type: str + output_file, # type: Union[str, BinaryIO] mode="w", # type: str deterministic_timestamp=False, # type: bool exclude_file=lambda _: False, # type: Callable[[str], bool] @@ -603,7 +606,7 @@ def zip( selected_files = self.files() compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED - with open_zip(filename, mode, compression) as zf: + with open_zip(output_file, mode, compression) as zf: def write_entry( filename, # type: str diff --git a/pex/pex_builder.py b/pex/pex_builder.py index 8805b1a05..a0bf002c6 100644 --- a/pex/pex_builder.py +++ b/pex/pex_builder.py @@ -7,6 +7,8 @@ import logging import os import shutil +import subprocess +from contextlib import contextmanager from pex import pex_warnings from pex.atomic_directory import atomic_directory @@ -20,6 +22,7 @@ safe_mkdir, safe_mkdtemp, safe_open, + temporary_dir, ) from pex.compatibility import commonpath, to_bytes from pex.compiler import Compiler @@ -28,6 +31,7 @@ from pex.environment import PEXEnvironment from pex.finders import get_entry_point_from_console_script, get_script_from_distributions from pex.interpreter import PythonInterpreter +from pex.jobs import Job from pex.layout import Layout from pex.orderedset import OrderedSet from pex.pex import PEX @@ -37,9 +41,10 @@ from pex.tracer import TRACER from pex.typing import TYPE_CHECKING from pex.util import CacheHelper +from pex.ziputils import ZipCommand if TYPE_CHECKING: - from typing import Dict, Optional + from typing import BinaryIO, Callable, Dict, Iterator, List, Optional, Tuple class CopyMode(Enum["CopyMode.Value"]): @@ -88,14 +93,14 @@ def __maybe_run_venv__(pex, pex_root, pex_path): venv_dir = venv_dir( pex_file=pex, - pex_root=pex_root, + pex_root=pex_root, pex_hash={pex_hash!r}, has_interpreter_constraints={has_interpreter_constraints!r}, pex_path=pex_path, ) venv_pex = os.path.join(venv_dir, 'pex') if not __execute__ or not is_exe(venv_pex): - # Code in bootstrap_pex will (re)create the venv after selecting the correct interpreter. + # Code in bootstrap_pex will (re)create the venv after selecting the correct interpreter. return venv_dir TRACER.log('Executing venv PEX for {{}} at {{}}'.format(pex, venv_pex)) @@ -434,6 +439,7 @@ def set_header(self, header): self._header = header def _add_dist_dir(self, path, dist_name, fingerprint=None): + # type: (str, str, Optional[str]) -> str target_dir = os.path.join(self._pex_info.internal_cache, dist_name) if self._copy_mode == CopyMode.SYMLINK: self._copy_or_link(path, target_dir, label=dist_name) @@ -550,6 +556,7 @@ def _copy_or_link(self, src, dst, label=None): elif self._copy_mode == CopyMode.SYMLINK: self._chroot.symlink(src, dst, label) else: + assert self._copy_mode == CopyMode.LINK self._chroot.link(src, dst, label) def _prepare_bootstrap(self): @@ -769,31 +776,144 @@ def zip_cache_dir(path): os.path.join(internal_cache, location), ) - def _build_zipapp( - self, - filename, # type: str - deterministic_timestamp=False, # type: bool - compress=True, # type: bool - ): - # type: (...) -> None + def _cache_dists_for_stitching(self, compress): + # type: (bool) -> Dict[str, str] + merge_deps = {} # type: Dict[str, str] + with TRACER.timed("caching dists for stitched output", V=3): + for dist_label, fingerprint in self._pex_info.distributions.items(): + cache_key = "{}-{}".format( + fingerprint, "compressed" if compress else "uncompressed" + ) + cached_zip = os.path.join( + self._pex_info.pex_root, + "stitched_dists", + cache_key, + dist_label, + ) + with atomic_directory(os.path.dirname(cached_zip)) as atomic_zip_dir: + if not atomic_zip_dir.is_finalized(): + atomic_output_file = os.path.join( + atomic_zip_dir.work_dir, os.path.basename(cached_zip) + ) + with TRACER.timed("caching single dist {}".format(dist_label), V=3): + self._chroot.zip( + atomic_output_file, + labels=(dist_label,), + deterministic_timestamp=True, + compress=compress, + exclude_file=is_pyc_temporary_file, + ) + assert os.path.isfile(cached_zip) + merge_deps[dist_label] = cached_zip + + return merge_deps + + @contextmanager + def _concatenate_cached_entries(self, zip_cmd, deterministic_timestamp, compress): + # type: (ZipCommand, bool, bool) -> Iterator[BinaryIO] + merge_deps = self._cache_dists_for_stitching(compress=compress) + uncached_labels = sorted(frozenset(self._chroot.labels()) - frozenset(merge_deps.keys())) + + with TRACER.timed("synthesize zipapp", V=6), temporary_dir() as td: + concatenated_nonzip = os.path.join(td, "concatenated.broken-zip") + with open(concatenated_nonzip, "w+b") as concat_f: + with TRACER.timed("zipping up uncached sources", V=3): + self._chroot.zip( + concat_f, + deterministic_timestamp=deterministic_timestamp, + compress=compress, + labels=uncached_labels, + ) + + with TRACER.timed("concatenating cached dist zips", V=3): + # Sort the cached zips by the prefixes of the filenames they'll be + # inserting into the merged result, to get a deterministic output. + for _, path in sorted(merge_deps.items(), key=lambda x: x[0]): + with open(path, "rb") as f: + shutil.copyfileobj(f, concat_f) # type: ignore[misc] + + fixed_zip = os.path.join(td, "fixed.zip") + zip_cmd.fix_concatenated_zips(concatenated_nonzip, fixed_zip) + + with open(fixed_zip, "rb") as read_handle: + yield read_handle + + @contextmanager + def _prepare_executable_zipapp(self, filename): + # type: (str) -> Iterator[BinaryIO] with safe_open(filename, "wb") as pexfile: assert os.path.getsize(pexfile.name) == 0 pexfile.write(to_bytes("{}\n".format(self._shebang))) if self._header: pexfile.write(to_bytes(self._header)) - with TRACER.timed("Zipping PEX file."): + + yield pexfile + + chmod_plus_x(pexfile.name) + + def _uncached_zipapp( + self, + filename, # type: str + deterministic_timestamp, # type: bool + compress, # type: bool + ): + # type: (...) -> None + + # When configured with a `copy_mode` of `CopyMode.SYMLINK`, we symlink distributions as + # pointers to installed wheel directories in ~/.pex/installed_wheels/... Since those + # installed wheels reside in a shared cache, they can be in-use by other processes and so + # their code may be in the process of being bytecode compiled as we attempt to zip up our + # chroot. Bytecode compilation produces ephemeral temporary pyc files that we should avoid + # copying since they are useless and inherently racy. + exclude_file = is_pyc_temporary_file + + with TRACER.timed("Zipping PEX file."), self._prepare_executable_zipapp( + filename + ) as pexfile: self._chroot.zip( - filename, - mode="a", + pexfile, deterministic_timestamp=deterministic_timestamp, - # When configured with a `copy_mode` of `CopyMode.SYMLINK`, we symlink distributions - # as pointers to installed wheel directories in ~/.pex/installed_wheels/... Since - # those installed wheels reside in a shared cache, they can be in-use by other - # processes and so their code may be in the process of being bytecode compiled as we - # attempt to zip up our chroot. Bytecode compilation produces ephemeral temporary - # pyc files that we should avoid copying since they are useless and inherently - # racy. - exclude_file=is_pyc_temporary_file, compress=compress, + exclude_file=exclude_file, ) - chmod_plus_x(filename) + + def _build_zipapp( + self, + filename, # type: str + deterministic_timestamp=False, # type: bool + compress=True, # type: bool + ): + # type: (...) -> None + # Naively creating a compressed zipapp with many downloaded distributions would perform + # a lot of I/O on each pex invocation and spend a lot of CPU on compression. While + # `--no-compress` runs significantly faster, the result may also be over twice as large. + should_try_synthesizing_from_cache = bool(self._pex_info.distributions) and compress + if not should_try_synthesizing_from_cache: + self._uncached_zipapp( + filename, deterministic_timestamp=deterministic_timestamp, compress=compress + ) + return + + # However, if we have access to the `zip` command, we can employ a caching strategy. + zip_cmd = ZipCommand.find() + if zip_cmd is None: + TRACER.log( + "`zip` command was not found, so compressed dist caches could not be used", + V=1, + ) + self._uncached_zipapp( + filename, deterministic_timestamp=deterministic_timestamp, compress=compress + ) + return + + with TRACER.timed( + "cache dists and synthesize zipapp", V=9 + ), self._concatenate_cached_entries( + zip_cmd, + deterministic_timestamp=deterministic_timestamp, + compress=compress, + ) as concatenated_zip_f: + with TRACER.timed( + "copying synthesized concatenated zip to output file", V=9 + ), self._prepare_executable_zipapp(filename) as pexfile: + shutil.copyfileobj(concatenated_zip_f, pexfile) # type: ignore[misc] diff --git a/pex/ziputils.py b/pex/ziputils.py index 7e2cd0b1c..12b78696f 100644 --- a/pex/ziputils.py +++ b/pex/ziputils.py @@ -7,11 +7,14 @@ import os import shutil import struct +import subprocess +from pex.jobs import Job +from pex.tracer import TRACER from pex.typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import BinaryIO, Optional + from typing import Any, BinaryIO, ClassVar, Optional import attr # vendor:skip else: @@ -242,3 +245,36 @@ def isolate_zip(self, out_fp): if self.has_header: in_fp.seek(self.header_size, os.SEEK_SET) shutil.copyfileobj(in_fp, out_fp) + + +@attr.s(frozen=True) +class ZipCommand(object): + exe_path = attr.ib() # type: str + + _cached = None # type: ClassVar[Optional[ZipCommand]] + + @classmethod + def find(cls): + # type: () -> Optional[ZipCommand] + if cls._cached is not None: + return cls._cached + + import distutils.spawn + + zip_path = distutils.spawn.find_executable("zip") + if zip_path: + cls._cached = cls(exe_path=zip_path) + return cls._cached + + def __call__(self, *args, **kwargs): + # type: (str, Any) -> Job + command = [self.exe_path] + list(args) + zip_proc = subprocess.Popen( + command, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, encoding="utf-8", **kwargs + ) + return Job(command, zip_proc) + + def fix_concatenated_zips(self, concatenated_zips_path, output_path): + # type: (str, str) -> None + with TRACER.timed("fixing up concatenated zips with `zip -FF`"): + self("-FF", concatenated_zips_path, "--out", output_path).wait()