diff --git a/pex/bin/pex.py b/pex/bin/pex.py index 691f06eac..0d83994e0 100755 --- a/pex/bin/pex.py +++ b/pex/bin/pex.py @@ -164,6 +164,23 @@ def configure_clp_pex_options(parser): ), ) + group.add_argument( + "--cache-dists", + "--no-cache-dists", + dest="cache_dists", + default=None, + action=HandleBoolAction, + help=( + "Whether to zip up each dist contained in the output PEX file into a fingerprinted " + "cache directory to speed up later PEX file builds. For `--layout packed`, this " + "behavior is enabled by default. " + "For `--layout zipapp`, this synthesizes the zip file from those cached zips with an " + "experimental zip merging technique, so this flag is disabled by default when building " + "a zipapp. This will re-use the same caches as `--layout packed`, so creating a " + "zipapp or packed PEX file from the same inputs will only populate the cache once. " + "This flag and behavior do not apply to other layouts." + ), + ) group.add_argument( "--compress", "--compressed", @@ -175,7 +192,11 @@ def configure_clp_pex_options(parser): action=HandleBoolAction, help=( "Whether to compress zip entries when creating either a zipapp PEX file or a packed " - "PEX's bootstrap and dependency zip files. Does nothing for loose layout PEXes." + "PEX's bootstrap and dependency zip files. " + "Uncompressed PEX files are much faster to create from an empty cache, but are no " + "faster after the cache has been populated, and uncompressed cache entries will " + "consume many times more space on disk. " + "Does nothing for loose layout PEXes." ), ) @@ -200,7 +221,7 @@ def configure_clp_pex_options(parser): action=HandleVenvAction, help="Convert the pex file to a venv before executing it. If 'prepend' or 'append' is " "specified, then all scripts and console scripts provided by distributions in the pex file " - "will be added to the PATH in the corresponding position. If the the pex file will be run " + "will be added to the PATH in the corresponding position. If the pex file will be run " "multiple times under a stable runtime PEX_ROOT, the venv creation will only be done once " "and subsequent runs will enjoy lower startup latency.", ) @@ -282,10 +303,12 @@ def configure_clp_pex_options(parser): dest="compile", default=False, action=HandleBoolAction, - help="Compiling means that the built pex will include .pyc files, which will result in " - "slightly faster startup performance. However, compiling means that the generated pex " + help="Compiling means that the built PEX will include .pyc files, which will result in " + "slightly faster startup performance. However, compiling means that the generated PEX " "likely will not be reproducible, meaning that if you were to run `./pex -o` with the " - "same inputs then the new pex would not be byte-for-byte identical to the original.", + "same inputs then the new PEX would not be byte-for-byte identical to the original. " + "Note that all PEX files are now unzipped and compiled when first executed, so this " + "flag only affects the startup performance of the first execution.", ) group.add_argument( @@ -294,10 +317,14 @@ def configure_clp_pex_options(parser): dest="use_system_time", default=False, action=HandleBoolAction, - help="Use the current system time to generate timestamps for the new pex. Otherwise, Pex " - "will use midnight on January 1, 1980. By using system time, the generated pex " - "will not be reproducible, meaning that if you were to run `./pex -o` with the " - "same inputs then the new pex would not be byte-for-byte identical to the original.", + help="Convert modification times from the filesystem into timestamps for any zip file " + "entries. Otherwise, Pex will use midnight on January 1, 1980. By using system time, the " + "generated PEX will not be reproducible, meaning that if you were to run `./pex -o` with " + "the same inputs then the new pex PEX not be byte-for-byte identical to the original. " + "Note that zip file entries synthesized from the pex cache (including any resolved " + "distributions) will always use the reproducible timestamp regardless of this flag. " + "Any unzipped output file will retain the timestamps of their sources regardless of this " + "flag, although this will not affect their checksum.", ) group.add_argument( @@ -949,6 +976,7 @@ def do_main( deterministic_timestamp=not options.use_system_time, layout=options.layout, compress=options.compress, + cache_dists=options.cache_dists, ) if options.seed != Seed.NONE: seed_info = seed_cache( diff --git a/pex/common.py b/pex/common.py index 7f9728638..d8906885e 100644 --- a/pex/common.py +++ b/pex/common.py @@ -6,6 +6,7 @@ import atexit import contextlib import errno +import io import itertools import os import re @@ -38,6 +39,8 @@ Union, ) + _DateTime = Tuple[int, int, int, int, int, int] + # We use the start of MS-DOS time, which is what zipfiles use (see section 4.4.6 of # https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT). @@ -137,6 +140,23 @@ def do_copy(): do_copy() +def copy_file_range(source, destination, length, buffer_size=io.DEFAULT_BUFFER_SIZE): + # type: (io.BufferedIOBase, io.BufferedIOBase, int, int) -> None + """Implementation of shutil.copyfileobj() that only copies exactly `length` bytes.""" + # We require a BufferedIOBase in order to avoid handling short reads or writes. + remaining_length = length + if buffer_size > length: + buffer_size = length + cur_buf = bytearray(buffer_size) + while remaining_length > buffer_size: + assert source.readinto(cur_buf) == buffer_size + assert destination.write(cur_buf) == buffer_size + remaining_length -= buffer_size + remainder = source.read(remaining_length) + assert len(remainder) == remaining_length + assert destination.write(remainder) == remaining_length + + # See http://stackoverflow.com/questions/2572172/referencing-other-modules-in-atexit class MktempTeardownRegistry(object): def __init__(self): @@ -173,7 +193,14 @@ class ZipEntry(namedtuple("ZipEntry", ["info", "data"])): pass @classmethod - def zip_entry_from_file(cls, filename, arcname=None, date_time=None): + def zip_entry_from_file( + cls, + filename, # type: str + arcname=None, # type: Optional[str] + date_time=None, # type: Optional[Tuple[int, ...]] + compression=zipfile.ZIP_STORED, # type: int + ): + # type: (...) -> PermPreservingZipFile.ZipEntry """Construct a ZipEntry for a file on the filesystem. Usually a similar `zip_info_from_file` method is provided by `ZipInfo`, but it is not @@ -192,16 +219,20 @@ def zip_entry_from_file(cls, filename, arcname=None, date_time=None): arcname += "/" if date_time is None: date_time = time.localtime(st.st_mtime) - zinfo = zipfile.ZipInfo(filename=arcname, date_time=date_time[:6]) + zinfo = zipfile.ZipInfo(filename=arcname, date_time=cast("_DateTime", date_time[:6])) zinfo.external_attr = (st.st_mode & 0xFFFF) << 16 # Unix attributes if isdir: zinfo.file_size = 0 zinfo.external_attr |= 0x10 # MS-DOS directory flag + # Always store directories decompressed, because they are empty but take up 2 bytes when + # compressed. zinfo.compress_type = zipfile.ZIP_STORED data = b"" else: zinfo.file_size = st.st_size - zinfo.compress_type = zipfile.ZIP_DEFLATED + # File contents may be compressed or decompressed. Decompressed is significantly faster + # to write, but caching makes up for that. + zinfo.compress_type = compression with open(filename, "rb") as fp: data = fp.read() return cls.ZipEntry(info=zinfo, data=data) @@ -281,18 +312,32 @@ def safe_mkdir(directory, clean=False): return directory +def _ensure_parent(filename): + # type: (str) -> None + parent_dir = os.path.dirname(filename) + if parent_dir: + safe_mkdir(parent_dir) + + def safe_open(filename, *args, **kwargs): """Safely open a file. ``safe_open`` ensures that the directory components leading up the specified file have been created first. """ - parent_dir = os.path.dirname(filename) - if parent_dir: - safe_mkdir(parent_dir) + _ensure_parent(filename) return open(filename, *args, **kwargs) # noqa: T802 +def safe_io_open(filename, *args, **kwargs): + # type: (str, Any, Any) -> io.IOBase + """``safe_open()``, but using ``io.open()`` instead. + + With the right arguments, this ensures the result produces a buffered file handle on py2.""" + _ensure_parent(filename) + return cast("io.IOBase", io.open(filename, *args, **kwargs)) + + def safe_delete(filename): # type: (str) -> None """Delete a file safely. @@ -608,9 +653,13 @@ def delete(self): # type: () -> None shutil.rmtree(self.chroot) + # This directory traversal, file I/O, and compression can be made faster with complex + # parallelism and pipelining in a compiled language, but the result is much harder to package, + # and is still less performant than effective caching. See investigation in + # https://github.com/pantsbuild/pex/issues/2158 and https://github.com/pantsbuild/pex/pull/2175. def zip( self, - filename, # type: str + output_file, # type: Union[str, io.IOBase, io.BufferedRandom] mode="w", # type: str deterministic_timestamp=False, # type: bool exclude_file=lambda _: False, # type: Callable[[str], bool] @@ -628,7 +677,7 @@ def zip( selected_files = self.files() compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED - with open_zip(filename, mode, compression) as zf: + with open_zip(output_file, mode, compression) as zf: def write_entry( filename, # type: str @@ -638,11 +687,12 @@ def write_entry( zip_entry = zf.zip_entry_from_file( filename=filename, arcname=os.path.relpath(arcname, strip_prefix) if strip_prefix else arcname, - date_time=DETERMINISTIC_DATETIME.timetuple() - if deterministic_timestamp - else None, + date_time=( + DETERMINISTIC_DATETIME.timetuple() if deterministic_timestamp else None + ), + compression=compression, ) - zf.writestr(zip_entry.info, zip_entry.data, compression) + zf.writestr(zip_entry.info, zip_entry.data) def get_parent_dir(path): # type: (str) -> Optional[str] diff --git a/pex/pex_builder.py b/pex/pex_builder.py index 3f13b1545..58a0f314a 100644 --- a/pex/pex_builder.py +++ b/pex/pex_builder.py @@ -7,6 +7,7 @@ import logging import os import shutil +import zipfile from pex import pex_warnings from pex.atomic_directory import atomic_directory @@ -17,9 +18,9 @@ is_pyc_temporary_file, safe_copy, safe_delete, + safe_io_open, safe_mkdir, safe_mkdtemp, - safe_open, ) from pex.compatibility import commonpath, to_bytes from pex.compiler import Compiler @@ -35,11 +36,13 @@ from pex.sh_boot import create_sh_boot_script from pex.targets import Targets from pex.tracer import TRACER -from pex.typing import TYPE_CHECKING +from pex.typing import TYPE_CHECKING, cast from pex.util import CacheHelper +from pex.ziputils import MergeableZipFile, buffered_zip_archive if TYPE_CHECKING: - from typing import Dict, Optional + import io + from typing import ClassVar, Dict, Iterable, Optional, Tuple # N.B.: __file__ will be relative when this module is loaded from a "" `sys.path` entry under @@ -95,14 +98,14 @@ def __maybe_run_venv__(pex, pex_root, pex_path): venv_dir = venv_dir( pex_file=pex, - pex_root=pex_root, + pex_root=pex_root, pex_hash={pex_hash!r}, has_interpreter_constraints={has_interpreter_constraints!r}, pex_path=pex_path, ) venv_pex = os.path.join(venv_dir, 'pex') if not __execute__ or not is_exe(venv_pex): - # Code in bootstrap_pex will (re)create the venv after selecting the correct interpreter. + # Code in bootstrap_pex will (re)create the venv after selecting the correct interpreter. return venv_dir TRACER.log('Executing venv PEX for {{}} at {{}}'.format(pex, venv_pex)) @@ -441,6 +444,7 @@ def set_header(self, header): self._header = header def _add_dist_dir(self, path, dist_name, fingerprint=None): + # type: (str, str, Optional[str]) -> str target_dir = os.path.join(self._pex_info.internal_cache, dist_name) if self._copy_mode == CopyMode.SYMLINK: self._copy_or_link(path, target_dir, label=dist_name) @@ -557,6 +561,7 @@ def _copy_or_link(self, src, dst, label=None): elif self._copy_mode == CopyMode.SYMLINK: self._chroot.symlink(src, dst, label) else: + assert self._copy_mode == CopyMode.LINK self._chroot.link(src, dst, label) def _prepare_bootstrap(self): @@ -633,12 +638,13 @@ def build( deterministic_timestamp=False, # type: bool layout=Layout.ZIPAPP, # type: Layout.Value compress=True, # type: bool + cache_dists=None, # type: Optional[bool] ): # type: (...) -> None """Package the PEX application. By default, the PEX is packaged as a zipapp for ease of shipping as a single file, but it - can also be packaged in spread mode for efficiency of syncing over the network + can also be packaged in a packed layout for efficiency of syncing over the network incrementally. :param path: The path where the PEX should be stored. @@ -647,6 +653,8 @@ def build( :param layout: The layout to use for the PEX. :param compress: Whether to compress zip entries when building to a layout that uses zip files. + :param cache_dists: For ``Layout.PACKED`` and ``Layout.ZIPAPP``, whether to zip up each dist + into the pex cache. Otherwise ignored. If the PEXBuilder is not yet frozen, it will be frozen by ``build``. This renders the PEXBuilder immutable. @@ -664,17 +672,34 @@ def build( else: shutil.rmtree(tmp_pex, True) + if not compress: + if cache_dists is True: + pex_warnings.warn( + "--cache-dists provides very little speedup when used with --no-compress, " + "while generating extremely large cache entries." + ) + if layout == Layout.LOOSE: shutil.copytree(self.path(), tmp_pex) elif layout == Layout.PACKED: + # Default to enabling the dist cache for packed layout. + if cache_dists is None: + cache_dists = True self._build_packedapp( dirname=tmp_pex, deterministic_timestamp=deterministic_timestamp, compress=compress, + cache_dists=cache_dists, ) else: + # Default to disabling the dist cache for zipapps. + if cache_dists is None: + cache_dists = False self._build_zipapp( - filename=tmp_pex, deterministic_timestamp=deterministic_timestamp, compress=compress + filename=tmp_pex, + deterministic_timestamp=deterministic_timestamp, + compress=compress, + cache_dists=cache_dists, ) if os.path.isdir(path): @@ -702,108 +727,290 @@ def set_sh_boot_script( ) self.set_header(script) + def _setup_pex_info(self): + # type: () -> Tuple[PexInfo, str] + pex_info = self._pex_info.copy() + pex_info.update(PexInfo.from_env()) + + bootstrap_hash = pex_info.bootstrap_hash + if bootstrap_hash is None: + raise AssertionError( + "Expected bootstrap_hash to be populated for {}.".format(self._pex_info) + ) + + return (pex_info, bootstrap_hash) + + _DIRECT_SOURCE_LABELS = ( + "executable", + "importhook", + "main", + "manifest", + "resource", + "source", + ) # type: ClassVar[Iterable[str]] + + @staticmethod + def _cache_root(pex_info, cache_dists): + # type: (PexInfo, bool) -> str + if cache_dists: + return pex_info.pex_root + return safe_mkdtemp(suffix="pex-zipped-dists") + def _build_packedapp( self, dirname, # type: str deterministic_timestamp=False, # type: bool compress=True, # type: bool + cache_dists=True, # type: bool ): # type: (...) -> None - - pex_info = self._pex_info.copy() - pex_info.update(PexInfo.from_env()) + pex_info, bootstrap_hash = self._setup_pex_info() + cache_root = self._cache_root(pex_info, cache_dists) # Include user sources, PEX-INFO and __main__ as loose files in src/. - for fileset in ("executable", "importhook", "main", "manifest", "resource", "source"): - for f in self._chroot.filesets.get(fileset, ()): - dest = os.path.join(dirname, f) - safe_mkdir(os.path.dirname(dest)) - safe_copy(os.path.realpath(os.path.join(self._chroot.chroot, f)), dest) - - # Pex historically only supported compressed zips in packed layout, so we don't disturb the - # old cache structure for those zips and instead just use a subdir for un-compressed zips. - # This works for our two zip caches (we'll have no collisions with legacy compressed zips) - # since the bootstrap zip has a known name that is not "un-compressed" and "un-compressed" - # is not a valid wheel name either. - def zip_cache_dir(path): - # type: (str) -> str - if compress: - return path - return os.path.join(path, "un-compressed") + with TRACER.timed("copying over uncached sources", V=9): + for fileset in self._DIRECT_SOURCE_LABELS: + for f in self._chroot.filesets.get(fileset, ()): + dest = os.path.join(dirname, f) + safe_mkdir(os.path.dirname(dest)) + safe_copy(os.path.realpath(os.path.join(self._chroot.chroot, f)), dest) # Zip up the bootstrap which is constant for a given version of Pex. - bootstrap_hash = pex_info.bootstrap_hash - if bootstrap_hash is None: - raise AssertionError( - "Expected bootstrap_hash to be populated for {}.".format(self._pex_info) - ) - cached_bootstrap_zip_dir = zip_cache_dir( - os.path.join(pex_info.pex_root, "bootstrap_zips", bootstrap_hash) - ) - with atomic_directory(cached_bootstrap_zip_dir) as atomic_bootstrap_zip_dir: - if not atomic_bootstrap_zip_dir.is_finalized(): - self._chroot.zip( - os.path.join(atomic_bootstrap_zip_dir.work_dir, pex_info.bootstrap), - deterministic_timestamp=deterministic_timestamp, - exclude_file=is_pyc_temporary_file, - strip_prefix=pex_info.bootstrap, - labels=("bootstrap",), - compress=compress, - ) - safe_copy( - os.path.join(cached_bootstrap_zip_dir, pex_info.bootstrap), - os.path.join(dirname, pex_info.bootstrap), + cached_bootstrap_zip = self._get_or_insert_into_bootstrap_cache( + cache_root, + pex_info, + bootstrap_hash, + compress=compress, ) + safe_copy(cached_bootstrap_zip, os.path.join(dirname, pex_info.bootstrap)) # Zip up each installed wheel chroot, which is constant for a given version of a # wheel. if pex_info.distributions: internal_cache = os.path.join(dirname, pex_info.internal_cache) os.mkdir(internal_cache) - for location, fingerprint in pex_info.distributions.items(): - cached_installed_wheel_zip_dir = zip_cache_dir( - os.path.join(pex_info.pex_root, "packed_wheels", fingerprint) + for dist_label, fingerprint in pex_info.distributions.items(): + cached_packed_zip = self._get_or_insert_into_packed_wheel_cache( + cache_root, + pex_info, + dist_label, + fingerprint, + compress=compress, ) - with atomic_directory(cached_installed_wheel_zip_dir) as atomic_zip_dir: - if not atomic_zip_dir.is_finalized(): - self._chroot.zip( - os.path.join(atomic_zip_dir.work_dir, location), - deterministic_timestamp=deterministic_timestamp, - exclude_file=is_pyc_temporary_file, - strip_prefix=os.path.join(pex_info.internal_cache, location), - labels=(location,), - compress=compress, - ) - safe_copy( - os.path.join(cached_installed_wheel_zip_dir, location), - os.path.join(internal_cache, location), + safe_copy(cached_packed_zip, os.path.join(internal_cache, dist_label)) + + @staticmethod + def get_zip_cache_subdir(base_path, compress): + # type: (str, bool) -> str + """Get the cache location for the entry at `base_path` depending on the value of `compress`. + + Pex historically only supported compressed zips in packed layout, so we don't disturb the + old cache structure for those zips and instead just use a subdir for un-compressed zips. + This works for our two zip caches (we'll have no collisions with legacy compressed zips) + since the bootstrap zip has a known name that is not "un-compressed" and "un-compressed" + is not a valid wheel name either. + """ + if compress: + return base_path + return os.path.join(base_path, "un-compressed") + + def _get_or_insert_into_bootstrap_cache( + self, + cache_root, # type: str + pex_info, # type: PexInfo + bootstrap_hash, # type: str + compress, # type: bool + ): + # type: (...) -> str + cached_bootstrap_zip_dir = self.get_zip_cache_subdir( + os.path.join(cache_root, "bootstrap_zips", bootstrap_hash), + compress=compress, + ) + with atomic_directory(cached_bootstrap_zip_dir) as atomic_bootstrap_zip_dir: + if not atomic_bootstrap_zip_dir.is_finalized(): + with TRACER.timed("generating bootstrap cache for hash {}".format(bootstrap_hash)): + self._chroot.zip( + os.path.join(atomic_bootstrap_zip_dir.work_dir, pex_info.bootstrap), + deterministic_timestamp=True, + exclude_file=is_pyc_temporary_file, + strip_prefix=pex_info.bootstrap, + labels=("bootstrap",), + compress=compress, + ) + return os.path.join(cached_bootstrap_zip_dir, pex_info.bootstrap) + + def _get_or_insert_into_packed_wheel_cache( + self, + cache_root, # type: str + pex_info, # type: PexInfo + dist_label, # type: str + fingerprint, # type: str + compress, # type: bool + ): + # type: (...) -> str + cached_installed_wheel_zip_dir = self.get_zip_cache_subdir( + os.path.join(cache_root, "packed_wheels", fingerprint), + compress=compress, + ) + with atomic_directory(cached_installed_wheel_zip_dir) as atomic_zip_dir: + if not atomic_zip_dir.is_finalized(): + with TRACER.timed("generating packed wheel cache for {}".format(dist_label)): + self._chroot.zip( + os.path.join(atomic_zip_dir.work_dir, dist_label), + # These will be getting served from a cache regardless of the value of + # deterministic_timestamp in the caller, so best to explicitly mark them as + # synthetic in some way to avoid timestamps changing depending on the + # cache contents. + deterministic_timestamp=True, + exclude_file=is_pyc_temporary_file, + strip_prefix=os.path.join(pex_info.internal_cache, dist_label), + labels=(dist_label,), + compress=compress, + ) + return os.path.join(cached_installed_wheel_zip_dir, dist_label) + + def _build_zipapp_without_cached_dists( + self, + pexfile, # type: io.IOBase + deterministic_timestamp, # type: bool + compress, # type: bool + ): + # type: (...) -> None + """Write the chroot into an executable zipapp, appending to the given file handle.""" + self._chroot.zip( + pexfile, + mode="a", + deterministic_timestamp=True, + # When configured with a `copy_mode` of `CopyMode.SYMLINK`, we symlink + # distributions as pointers to installed wheel directories in + # ~/.pex/installed_wheels/... Since those installed wheels reside in + # a shared cache, they can be in-use by other processes and so their code + # may be in the process of being bytecode compiled as we attempt to zip up + # our chroot. Bytecode compilation produces ephemeral temporary pyc files + # that we should avoid copying since they are useless and inherently racy. + exclude_file=is_pyc_temporary_file, + compress=compress, + # Iterate over all the files in the Chroot, including descending into symlinks to + # unpacked third-party dists in the pex cache. + labels=None, + ) + + def _build_zipapp_with_cached_dists( + self, + pexfile, # type: io.BufferedRandom + deterministic_timestamp, # type: bool + compress, # type: bool + ): + # type: (...) -> None + """Engage our hacky solution to reuse the same caches as for --layout packed. + + This reads from a single compressed file per cached resource instead of + traversing any directory trees. If compress=True, this will also significantly + reduce the total number of bytes to read and avoid a lot of single-threaded + computation, because the compressed entries are simply copied over as-is. + + Note that these cached zips are created in the --layout packed format, without + the .bootstrap/ or .deps/ prefixes we need to form a proper Pex zipapp + layout. Our zip file merging solution edits each entry's .filename with the + appropriate prefix, but we will still need to generate intermediate directory + entries before adding the prefixed files in order to unzip correctly. + """ + pex_info, bootstrap_hash = self._setup_pex_info() + cache_root = self._cache_root(pex_info, True) + + # (1) Merge in all the cacheable dists. + with MergeableZipFile(pexfile, mode="a") as zf: + # (1.1) Add the single bootstrap dir. + with TRACER.timed("adding bootstrap dir", V=9): + # Generate ".bootstrap/". + zf.mkdir(pex_info.bootstrap) + cached_bootstrap_zip = self._get_or_insert_into_bootstrap_cache( + cache_root, + pex_info, + bootstrap_hash, + compress=compress, ) + with buffered_zip_archive(cached_bootstrap_zip) as bootstrap_zf: + zf.merge_archive(bootstrap_zf, name_prefix=pex_info.bootstrap) + + # (1.2) Add a subdirectory for each resolved dist. + with TRACER.timed("adding dependencies", V=9): + # Generate ".deps/". + zf.mkdir(pex_info.internal_cache) + + # Sort the dict keys for a deterministic output. This also ensures that the + # contents of the .deps/ subdirectory are lexicographically sorted, which + # corresponds to the order they would have been added to the zip without + # any caching. + for dist_label in sorted(pex_info.distributions.keys()): + fingerprint = pex_info.distributions[dist_label] + + dist_prefix = os.path.join(pex_info.internal_cache, dist_label) + # Generate e.g. ".deps/Keras-2.4.3-py2.py3-none-any.whl/". + zf.mkdir(dist_prefix) + + cached_packed_zip = self._get_or_insert_into_packed_wheel_cache( + cache_root, + pex_info, + dist_label, + fingerprint, + compress=compress, + ) + with buffered_zip_archive(cached_packed_zip) as packed_zf: + zf.merge_archive(packed_zf, name_prefix=dist_prefix) + + # (2) Zip up everything that won't be retrieved from a cache first. + with TRACER.timed("zipping up uncached sources", V=9): + # Reuse the file handle again. + self._chroot.zip( + pexfile, + mode="a", + # These files are the only ones that might have a nonzero timestamp. All + # entries copied from cache are assigned the zero timestamp, so their + # checksums won't depend on the state of the pex cache. + deterministic_timestamp=deterministic_timestamp, + compress=compress, + # This includes __main__.py, which is what the python interpreter first looks for + # when executing a zipapp. We make sure it's near the central directory records at + # the end of the file, so that after the interpreter parses the central directory, + # it doesn't need to seek very far to find the file to execute. + labels=self._DIRECT_SOURCE_LABELS, + ) def _build_zipapp( self, filename, # type: str deterministic_timestamp=False, # type: bool compress=True, # type: bool + cache_dists=False, # type: bool ): # type: (...) -> None - with safe_open(filename, "wb") as pexfile: - assert os.path.getsize(pexfile.name) == 0 - pexfile.write(to_bytes("{}\n".format(self._shebang))) - if self._header: - pexfile.write(to_bytes(self._header)) with TRACER.timed("Zipping PEX file."): - self._chroot.zip( - filename, - mode="a", - deterministic_timestamp=deterministic_timestamp, - # When configured with a `copy_mode` of `CopyMode.SYMLINK`, we symlink distributions - # as pointers to installed wheel directories in ~/.pex/installed_wheels/... Since - # those installed wheels reside in a shared cache, they can be in-use by other - # processes and so their code may be in the process of being bytecode compiled as we - # attempt to zip up our chroot. Bytecode compilation produces ephemeral temporary - # pyc files that we should avoid copying since they are useless and inherently - # racy. - exclude_file=is_pyc_temporary_file, - compress=compress, - ) + # We will be reusing the same file handle to wrap with ZipFile, which needs to be able + # to read the file contents when instantiated, so it can check for existing zip + # file data. Hence, we open with "w+b". + with safe_io_open(filename, "w+b") as pexfile: + # Write shebang line and any header script to a non-zip file handle. + pexfile.write(to_bytes("{}\n".format(self._shebang))) + if self._header: + pexfile.write(to_bytes(self._header)) + + # Reuse the file handle to zip into. This isn't necessary (we could close and reopen + # it), but it avoids unnecessarily flushing to disk. When we wrap an existing handle + # like this, the ZipFile class will re-parse the central directory records at the + # end of the file and then reset the cursor to the beginning of the central + # directory records. + if cache_dists: + self._build_zipapp_with_cached_dists( + cast("io.BufferedRandom", pexfile), + deterministic_timestamp=deterministic_timestamp, + compress=compress, + ) + else: + self._build_zipapp_without_cached_dists( + pexfile, + deterministic_timestamp=deterministic_timestamp, + compress=compress, + ) + chmod_plus_x(filename) diff --git a/pex/ziputils.py b/pex/ziputils.py index 7e2cd0b1c..d43b99382 100644 --- a/pex/ziputils.py +++ b/pex/ziputils.py @@ -5,13 +5,18 @@ import io import os +import re import shutil import struct +import subprocess +import zipfile +from pex.common import DETERMINISTIC_DATETIME, copy_file_range, safe_io_open from pex.typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import BinaryIO, Optional + from io import BufferedIOBase + from typing import BinaryIO, Optional, Union import attr # vendor:skip else: @@ -242,3 +247,93 @@ def isolate_zip(self, out_fp): if self.has_header: in_fp.seek(self.header_size, os.SEEK_SET) shutil.copyfileobj(in_fp, out_fp) + + +def buffered_zip_archive(filename): + # type: (str) -> zipfile.ZipFile + """Return a ``zipfile.ZipFile`` instance backed by a buffered file handle. + + This is required by ``MergeableZipFile#merge_archive()`` to copy over exactly the right amount + of bytes (via ``pex.common.copy_file_range()``).""" + buffered_handle = safe_io_open(filename, mode="rb") + return zipfile.ZipFile(buffered_handle, mode="r") # type: ignore[arg-type] + + +# TODO: merge this with PermPreservingZipFile in pex.common? +class MergeableZipFile(zipfile.ZipFile): + """A zip file that can copy over the contents of other zips without decompression. + + This is used to synthesize --layout zipapp PEX files from other zip files in the pex cache.""" + + def __init__(self, *args, **kwargs): + kwargs.setdefault("allowZip64", True) + super(MergeableZipFile, self).__init__(*args, **kwargs) + + def mkdir(self, name, mode=511): + # type: (str, int) -> None + """Polyfill for ZipFile#mkdir() in < 3.11. + + Extracted from https://github.com/python/cpython/pull/32160.""" + # End in a single slash. + arcname = re.sub(r"/*$", "/", name) + # Unlike PermPreservingZipFile#zip_entry_from_file(), this should never correspond to a file + # on disk, as this class is used to synthesize zip files from cache and the created + # directories are also virtual. Giving it a non-zero timestamp would be misleading. + zinfo = zipfile.ZipInfo(filename=arcname, date_time=DETERMINISTIC_DATETIME.timetuple()[:6]) + zinfo.file_size = 0 + zinfo.external_attr = ((0o40000 | mode) & 0xFFFF) << 16 + zinfo.external_attr |= 0x10 + zinfo.compress_type = zipfile.ZIP_STORED + self.writestr(zinfo, b"") + + # This exists to placate mypy. + def __enter__(self): + # type: () -> MergeableZipFile + return self + + def merge_archive(self, source_zf, name_prefix=None): + # type: (zipfile.ZipFile, Optional[str]) -> None + """Copy entries from `source_path` to `destination` without decompressing. + + If provided, `name_prefix` will be applied to the names of file and directory entries. + + NB: This method is not aware of data descriptors, and will not copy over their contents!""" + assert self.fp is not None + assert isinstance(self.fp, io.BufferedIOBase) # type: ignore[unreachable] + assert source_zf.fp is not None # type: ignore[unreachable] + assert isinstance(source_zf.fp, io.BufferedIOBase) + + for zinfo in source_zf.infolist(): + # We will be mutating the ZipInfo and writing to the destination stream, so save this + # info upfront. + source_header_len = len(zinfo.FileHeader()) + source_offset = zinfo.header_offset + start_of_destination_entry = self.fp.tell() + + # (1) Modify the values which will affect the contents of the local file header for + # this entry. + if name_prefix: + zinfo.filename = os.path.join(name_prefix, zinfo.filename) + + # (2) Modify the values which will affect this entry's central directory header. + # The new entry will begin at the very end of the existing file. + zinfo.header_offset = start_of_destination_entry + + # (3) Generate the modified header, then copy over the header and contents. + # > (3.1) Copy over the header bytes verbatim into the output zip's underlying + # file handle. + self.fp.write(zinfo.FileHeader()) + + # > (3.2) Seek to the start of the source entry's file contents. + source_zf.fp.seek(source_offset + source_header_len, io.SEEK_SET) + # > (3.3) Copy over the file data verbatim from the source zip's underlying + # file handle. + copy_file_range(source_zf.fp, self.fp, zinfo.compress_size) + + # (4) Hack the synthesized ZipInfo onto the destination zip's infos. + self.filelist.append(zinfo) + self.NameToInfo[zinfo.filename] = zinfo + + # Update the central directory start position so the zipfile.ZipFile writes after all the + # data we just added. + self.start_dir = self.fp.tell()