Skip to content

Commit

Permalink
squashed commit
Browse files Browse the repository at this point in the history
add change to do parallel zipping only, no crawling

modify cli arg format for medusa-zip tool

update cli arg format

fix non-exhaustive CopyMode usage

[BROKEN] add first run of complete medusa zip with cli arg!

the resulting zip cannot be zipimported yet....

medusa zipping works great now, let's revert .zip() changes

bump medusa options

bump more medusa options

use the merged medusa command lines now

manage a cache of parallel intermediate zip generation jobs!

small fix

much closer to mergeable now

working much more complex control flow between the medusa-zip cli

move medusa zip to medusa.py

medusa works for packed apps now too

works for everything, but kind of crazy

close stdin after writing to the child process

factor out a ridiculous amount of boilerplate

add back the non-medusa impl for packed layouts

implement a "normal" version which uses atomic directories

revert unintentional whitespace changes

separate the serial and parallel pex creations

remove the attempts at parallelism

add --medusa-path

remove unused code

make the medusa hook work when not provided

add back a tracer

revert some changes that make things harder to read

revert some changes i shouldn't need

make medusa work with the medusa-zip package and not subprocesses!

update after adding defaults in medusa-zip python package

remove -f arg for resolving medusa-zip

[BROKEN] possibly obsolete!

fix cli arg

add stitched layout

create stitch copymode

no

initial stitch impl
  • Loading branch information
cosmicexplorer committed Jul 26, 2023
1 parent 7d6ecdd commit e6ea967
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 25 deletions.
7 changes: 5 additions & 2 deletions pex/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
if TYPE_CHECKING:
from typing import (
Any,
BinaryIO,
Callable,
DefaultDict,
Iterable,
Expand All @@ -34,6 +35,7 @@
Set,
Sized,
Tuple,
Union,
)

# We use the start of MS-DOS time, which is what zipfiles use (see section 4.4.6 of
Expand Down Expand Up @@ -220,6 +222,7 @@ def _chmod(self, info, path):

@contextlib.contextmanager
def open_zip(path, *args, **kwargs):
# type: (Union[str, BinaryIO], Any, Any) -> Iterator[PermPreservingZipFile]
"""A contextmanager for zip files.
Passes through positional and kwargs to zipfile.ZipFile.
Expand Down Expand Up @@ -585,7 +588,7 @@ def delete(self):

def zip(
self,
filename, # type: str
output_file, # type: Union[str, BinaryIO]
mode="w", # type: str
deterministic_timestamp=False, # type: bool
exclude_file=lambda _: False, # type: Callable[[str], bool]
Expand All @@ -603,7 +606,7 @@ def zip(
selected_files = self.files()

compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
with open_zip(filename, mode, compression) as zf:
with open_zip(output_file, mode, compression) as zf:

def write_entry(
filename, # type: str
Expand Down
164 changes: 142 additions & 22 deletions pex/pex_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import logging
import os
import shutil
import subprocess
from contextlib import contextmanager

from pex import pex_warnings
from pex.atomic_directory import atomic_directory
Expand All @@ -20,6 +22,7 @@
safe_mkdir,
safe_mkdtemp,
safe_open,
temporary_dir,
)
from pex.compatibility import commonpath, to_bytes
from pex.compiler import Compiler
Expand All @@ -28,6 +31,7 @@
from pex.environment import PEXEnvironment
from pex.finders import get_entry_point_from_console_script, get_script_from_distributions
from pex.interpreter import PythonInterpreter
from pex.jobs import Job
from pex.layout import Layout
from pex.orderedset import OrderedSet
from pex.pex import PEX
Expand All @@ -37,9 +41,10 @@
from pex.tracer import TRACER
from pex.typing import TYPE_CHECKING
from pex.util import CacheHelper
from pex.ziputils import ZipCommand

if TYPE_CHECKING:
from typing import Dict, Optional
from typing import BinaryIO, Callable, Dict, Iterator, List, Optional, Tuple


class CopyMode(Enum["CopyMode.Value"]):
Expand Down Expand Up @@ -88,14 +93,14 @@ def __maybe_run_venv__(pex, pex_root, pex_path):
venv_dir = venv_dir(
pex_file=pex,
pex_root=pex_root,
pex_root=pex_root,
pex_hash={pex_hash!r},
has_interpreter_constraints={has_interpreter_constraints!r},
pex_path=pex_path,
)
venv_pex = os.path.join(venv_dir, 'pex')
if not __execute__ or not is_exe(venv_pex):
# Code in bootstrap_pex will (re)create the venv after selecting the correct interpreter.
# Code in bootstrap_pex will (re)create the venv after selecting the correct interpreter.
return venv_dir
TRACER.log('Executing venv PEX for {{}} at {{}}'.format(pex, venv_pex))
Expand Down Expand Up @@ -434,6 +439,7 @@ def set_header(self, header):
self._header = header

def _add_dist_dir(self, path, dist_name, fingerprint=None):
# type: (str, str, Optional[str]) -> str
target_dir = os.path.join(self._pex_info.internal_cache, dist_name)
if self._copy_mode == CopyMode.SYMLINK:
self._copy_or_link(path, target_dir, label=dist_name)
Expand Down Expand Up @@ -550,6 +556,7 @@ def _copy_or_link(self, src, dst, label=None):
elif self._copy_mode == CopyMode.SYMLINK:
self._chroot.symlink(src, dst, label)
else:
assert self._copy_mode == CopyMode.LINK
self._chroot.link(src, dst, label)

def _prepare_bootstrap(self):
Expand Down Expand Up @@ -769,31 +776,144 @@ def zip_cache_dir(path):
os.path.join(internal_cache, location),
)

def _build_zipapp(
self,
filename, # type: str
deterministic_timestamp=False, # type: bool
compress=True, # type: bool
):
# type: (...) -> None
def _cache_dists_for_stitching(self, compress):
# type: (bool) -> Dict[str, str]
merge_deps = {} # type: Dict[str, str]
with TRACER.timed("caching dists for stitched output", V=3):
for dist_label, fingerprint in self._pex_info.distributions.items():
cache_key = "{}-{}".format(
fingerprint, "compressed" if compress else "uncompressed"
)
cached_zip = os.path.join(
self._pex_info.pex_root,
"stitched_dists",
cache_key,
dist_label,
)
with atomic_directory(os.path.dirname(cached_zip)) as atomic_zip_dir:
if not atomic_zip_dir.is_finalized():
atomic_output_file = os.path.join(
atomic_zip_dir.work_dir, os.path.basename(cached_zip)
)
with TRACER.timed("caching single dist {}".format(dist_label), V=3):
self._chroot.zip(
atomic_output_file,
labels=(dist_label,),
deterministic_timestamp=True,
compress=compress,
exclude_file=is_pyc_temporary_file,
)
assert os.path.isfile(cached_zip)
merge_deps[dist_label] = cached_zip

return merge_deps

@contextmanager
def _concatenate_cached_entries(self, zip_cmd, deterministic_timestamp, compress):
# type: (ZipCommand, bool, bool) -> Iterator[BinaryIO]
merge_deps = self._cache_dists_for_stitching(compress=compress)
uncached_labels = sorted(frozenset(self._chroot.labels()) - frozenset(merge_deps.keys()))

with TRACER.timed("synthesize zipapp", V=6), temporary_dir() as td:
concatenated_nonzip = os.path.join(td, "concatenated.broken-zip")
with open(concatenated_nonzip, "w+b") as concat_f:
with TRACER.timed("zipping up uncached sources", V=3):
self._chroot.zip(
concat_f,
deterministic_timestamp=deterministic_timestamp,
compress=compress,
labels=uncached_labels,
)

with TRACER.timed("concatenating cached dist zips", V=3):
# Sort the cached zips by the prefixes of the filenames they'll be
# inserting into the merged result, to get a deterministic output.
for _, path in sorted(merge_deps.items(), key=lambda x: x[0]):
with open(path, "rb") as f:
shutil.copyfileobj(f, concat_f) # type: ignore[misc]

fixed_zip = os.path.join(td, "fixed.zip")
zip_cmd.fix_concatenated_zips(concatenated_nonzip, fixed_zip)

with open(fixed_zip, "rb") as read_handle:
yield read_handle

@contextmanager
def _prepare_executable_zipapp(self, filename):
# type: (str) -> Iterator[BinaryIO]
with safe_open(filename, "wb") as pexfile:
assert os.path.getsize(pexfile.name) == 0
pexfile.write(to_bytes("{}\n".format(self._shebang)))
if self._header:
pexfile.write(to_bytes(self._header))
with TRACER.timed("Zipping PEX file."):

yield pexfile

chmod_plus_x(pexfile.name)

def _uncached_zipapp(
self,
filename, # type: str
deterministic_timestamp, # type: bool
compress, # type: bool
):
# type: (...) -> None

# When configured with a `copy_mode` of `CopyMode.SYMLINK`, we symlink distributions as
# pointers to installed wheel directories in ~/.pex/installed_wheels/... Since those
# installed wheels reside in a shared cache, they can be in-use by other processes and so
# their code may be in the process of being bytecode compiled as we attempt to zip up our
# chroot. Bytecode compilation produces ephemeral temporary pyc files that we should avoid
# copying since they are useless and inherently racy.
exclude_file = is_pyc_temporary_file

with TRACER.timed("Zipping PEX file."), self._prepare_executable_zipapp(
filename
) as pexfile:
self._chroot.zip(
filename,
mode="a",
pexfile,
deterministic_timestamp=deterministic_timestamp,
# When configured with a `copy_mode` of `CopyMode.SYMLINK`, we symlink distributions
# as pointers to installed wheel directories in ~/.pex/installed_wheels/... Since
# those installed wheels reside in a shared cache, they can be in-use by other
# processes and so their code may be in the process of being bytecode compiled as we
# attempt to zip up our chroot. Bytecode compilation produces ephemeral temporary
# pyc files that we should avoid copying since they are useless and inherently
# racy.
exclude_file=is_pyc_temporary_file,
compress=compress,
exclude_file=exclude_file,
)
chmod_plus_x(filename)

def _build_zipapp(
self,
filename, # type: str
deterministic_timestamp=False, # type: bool
compress=True, # type: bool
):
# type: (...) -> None
# Naively creating a compressed zipapp with many downloaded distributions would perform
# a lot of I/O on each pex invocation and spend a lot of CPU on compression. While
# `--no-compress` runs significantly faster, the result may also be over twice as large.
should_try_synthesizing_from_cache = bool(self._pex_info.distributions) and compress
if not should_try_synthesizing_from_cache:
self._uncached_zipapp(
filename, deterministic_timestamp=deterministic_timestamp, compress=compress
)
return

# However, if we have access to the `zip` command, we can employ a caching strategy.
zip_cmd = ZipCommand.find()
if zip_cmd is None:
TRACER.log(
"`zip` command was not found, so compressed dist caches could not be used",
V=1,
)
self._uncached_zipapp(
filename, deterministic_timestamp=deterministic_timestamp, compress=compress
)
return

with TRACER.timed(
"cache dists and synthesize zipapp", V=9
), self._concatenate_cached_entries(
zip_cmd,
deterministic_timestamp=deterministic_timestamp,
compress=compress,
) as concatenated_zip_f:
with TRACER.timed(
"copying synthesized concatenated zip to output file", V=9
), self._prepare_executable_zipapp(filename) as pexfile:
shutil.copyfileobj(concatenated_zip_f, pexfile) # type: ignore[misc]
38 changes: 37 additions & 1 deletion pex/ziputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
import os
import shutil
import struct
import subprocess

from pex.jobs import Job
from pex.tracer import TRACER
from pex.typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import BinaryIO, Optional
from typing import Any, BinaryIO, ClassVar, Optional

import attr # vendor:skip
else:
Expand Down Expand Up @@ -242,3 +245,36 @@ def isolate_zip(self, out_fp):
if self.has_header:
in_fp.seek(self.header_size, os.SEEK_SET)
shutil.copyfileobj(in_fp, out_fp)


@attr.s(frozen=True)
class ZipCommand(object):
exe_path = attr.ib() # type: str

_cached = None # type: ClassVar[Optional[ZipCommand]]

@classmethod
def find(cls):
# type: () -> Optional[ZipCommand]
if cls._cached is not None:
return cls._cached

import distutils.spawn

zip_path = distutils.spawn.find_executable("zip")
if zip_path:
cls._cached = cls(exe_path=zip_path)
return cls._cached

def __call__(self, *args, **kwargs):
# type: (str, Any) -> Job
command = [self.exe_path] + list(args)
zip_proc = subprocess.Popen(
command, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, encoding="utf-8", **kwargs
)
return Job(command, zip_proc)

def fix_concatenated_zips(self, concatenated_zips_path, output_path):
# type: (str, str) -> None
with TRACER.timed("fixing up concatenated zips with `zip -FF`"):
self("-FF", concatenated_zips_path, "--out", output_path).wait()

0 comments on commit e6ea967

Please sign in to comment.