Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upstream20241110 #51

Merged
merged 8 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,34 @@ on:
- main

jobs:

pre-commit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1
- uses: pre-commit/action@2c7b3805fd2a0fd8c1884dcaebf91fc102a13ecd # v3.0.1
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1
- uses: pre-commit/action@2c7b3805fd2a0fd8c1884dcaebf91fc102a13ecd # v3.0.1

test:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ ubuntu-latest, macos-latest ]
python-version: [ "3.8", "3.10", "3.12" ]
resolver: [ mamba, conda, micromamba ]
os: [ubuntu-latest, macos-latest]
python-version: ["3.8", "3.10", "3.12"]
resolver: [mamba, conda, micromamba]
micromamba-version: ["1.5.10-0", "latest"]
mamba-version: ["mamba=1.5.10", "mamba"]
exclude:
- resolver: mamba
micromamba-version: "1.5.10-0"
- resolver: conda
micromamba-version: "1.5.10-0"
- resolver: conda
mamba-version: "mamba=1.5.10"
- resolver: micromamba
mamba-version: "mamba=1.5.10"
- python-version: "3.8"
mamba-version: "mamba=1.5.10"
env:
METAFLOW_CONDA_DEPENDENCY_RESOLVER: ${{ matrix.resolver }}
METAFLOW_CONDA_TEST: 1
Expand All @@ -31,18 +43,19 @@ jobs:

- uses: mamba-org/setup-micromamba@f8b8a1e23a26f60a44c853292711bacfd3eac822 # v1.9.0
with:
micromamba-version: latest
micromamba-version: ${{ matrix.micromamba-version }}
environment-file: dev-env.yml
init-shell: bash
create-args: >-
python=${{ matrix.python-version }}
${{ matrix.mamba-version }}

- name: install nflx-extension
shell: bash -eo pipefail -l {0}
run: |
which pip
pip install -e . --force-reinstall -U

- name: install bash
if: runner.os == 'macOS'
run: brew install bash
Expand Down
5 changes: 3 additions & 2 deletions metaflow_extensions/netflix_ext/cmd/debug/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,9 @@ def merge_artifacts(

# Escape hatch stub
ESCAPE_HATCH_STUB = """
import sys
sys.path.insert(0, "{MetaflowEnvEscapeDir}")
# Uncomment the following lines if os.environ.get("PYTHONPATH") is not set
# import sys
# sys.path.insert(0, "{MetaflowEnvEscapeDir}")
"""

# Imports needed to define stubbed classes & Debug steps
Expand Down
27 changes: 27 additions & 0 deletions metaflow_extensions/netflix_ext/cmd/debug/debug_cmd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import json
import tarfile
import datetime

Expand Down Expand Up @@ -287,9 +288,13 @@ def _generate_debug_scripts(
if generate_notebook:
kernel_def = _find_kernel_name(python_executable)
if kernel_def:
_update_kernel_pythonpath(kernel_def[2], metaflow_root_dir)
obj.echo(
f"Jupyter kernel name: {kernel_def[0]} with display name: {kernel_def[1]}"
)
obj.echo(
f"Added escape trampolines to PYTHONPATH for the kernel {kernel_def[0]}"
)
notebook_json = debug_script_generator.generate_debug_notebook(
metaflow_root_dir, debug_file_name, kernel_def
)
Expand All @@ -304,3 +309,25 @@ def _generate_debug_scripts(
# We copy the stub generators to the metaflow root directory as the stub generators
# may not be present in the metaflow version that the user is using.
copy_stub_generator_to_metaflow_root_dir(metaflow_root_dir)


def _update_kernel_pythonpath(kernelspec_path, metaflow_root_dir):
"""
Updates the kernelspec with the escape trampolines added to the PYTHONPATH.

Parameters
----------
kernelspec_path : str
The kernelspec path.
metaflow_root_dir : str
The metaflow root directory.
"""
kernel_json_path = os.path.join(kernelspec_path, "kernel.json")
with open(kernel_json_path, "r") as f:
kernel_json = json.load(f)

_ = kernel_json.setdefault("env", {})["PYTHONPATH"] = os.path.abspath(
os.path.join(metaflow_root_dir, "_escape_trampolines")
)
with open(kernel_json_path, "w") as f:
json.dump(kernel_json, f, indent=4)
29 changes: 26 additions & 3 deletions metaflow_extensions/netflix_ext/cmd/debug/debug_stub_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,32 @@ def get_previous_tasks(self, previous_steps: List[Step]) -> List[Task]:
return sorted(
[task for task in previous_steps[0].tasks()], key=lambda x: x.index
)
# If the step is linear, split-foreach, split-static or a static-join, then we just return the
# tasks in the order they were run.
return [step.task for step in previous_steps]
# TODO: This method is incomplete and incorrect in the general case. We actually
# need more information to return the exact subset of tasks that are runtime
# parents in some situations with nested foreaches. For now, best effort but
# we need to revisit/fix these. The primary limitation right now is that the
# `foreach-stack` field we are referring to is capped at some number of characters
# which can fail to work for deeply nested foreaches or even shallow ones with
# long values.
foreach_list = self.task.metadata_dict.get("foreach-stack", [])
if not foreach_list:
# We are not part of a foreach so return all the previous tasks. This will
# be a list of 1 for most everything except for a join.
return [step.task for step in previous_steps]

# We are part of a foreach, we want to list tasks that either have the same
# foreach_list or match everything but the last element
def _filter(t):
t_foreach_list = t.metadata_dict.get("foreach-stack", [])
return (
len(t_foreach_list) == len(foreach_list)
and t_foreach_list == foreach_list
) or t_foreach_list == foreach_list[:-1]

to_return = []
for step in previous_steps:
to_return.extend([task for task in step if _filter(task)])
return to_return

def get_task_namespace(self) -> str:
"""
Expand Down
10 changes: 7 additions & 3 deletions metaflow_extensions/netflix_ext/cmd/debug/debug_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,19 @@ def _find_kernel_name(

Returns
-------
Optional[Tuple[str, str]]
The kernel name and the kernel's display name if found, None otherwise.
Optional[Tuple[str, str, str]]
The kernel name, kernel display name, and kernelspec path if found, None otherwise.
"""
try:
output = subprocess.check_output(["jupyter", "kernelspec", "list", "--json"])
kernelspecs = json.loads(output)
for kernel_name, kernel_spec in kernelspecs["kernelspecs"].items():
if kernel_spec["spec"]["argv"][0] == python_executable:
return kernel_name, kernel_spec["spec"]["display_name"]
return (
kernel_name,
kernel_spec["spec"]["display_name"],
kernel_spec["resource_dir"],
)
except Exception as e:
# Ignore the exception and return None as it is a best effort function
print(f"Error finding kernel name: {traceback.format_exc()}")
Expand Down
148 changes: 85 additions & 63 deletions metaflow_extensions/netflix_ext/cmd/environment/environment_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from metaflow.plugins import DATASTORES
from metaflow.metaflow_config import (
CONDA_ALL_ARCHS,
CONDA_DEPENDENCY_RESOLVER,
CONDA_TEST,
CONDA_SYS_DEPENDENCIES,
DEFAULT_DATASTORE,
Expand Down Expand Up @@ -241,6 +242,13 @@ def environment(
help="Recreate the environment if it already exists and remove the `into` directory "
"if it exists",
)
@click.option(
"--strict/--no-strict",
default=True,
is_flag=True,
show_default=True,
help="If True, fails if it cannot install the original Metaflow environment",
)
@click.option(
"--into-dir",
default=None,
Expand Down Expand Up @@ -269,6 +277,7 @@ def create(
name: Optional[str],
local_only: bool,
force: bool,
strict: bool,
into_dir: Optional[str],
install_notebook: bool,
pathspec: bool,
Expand Down Expand Up @@ -299,26 +308,31 @@ def create(
else:
os.makedirs(into_dir)
if install_notebook and name is None:
raise click.BadOptionUsage("--install-notebook requires --name")
raise click.BadOptionUsage("install-notebook", "requires --name")

code_pkg = None
mf_version = None
mf_version = ""
mf_extensions_info = None

if pathspec:
env_name = "step:%s" % env_name
alias_type, resolved_alias = resolve_env_alias(env_name)
if alias_type == AliasType.PATHSPEC:
if not pathspec:
raise click.BadOptionUsage(
"--pathspec used but environment name is not a pathspec"
)
task = Step(resolved_alias, _namespace_check=False).task
code_pkg = task.code
mf_version = task.metadata_dict["metaflow_version"]
raise click.BadOptionUsage("pathspec", "environment name is not a pathspec")
step = Step(resolved_alias, _namespace_check=False)
parameters_task = Step(
"%s/_parameters" % step.parent.pathspec, _namespace_check=False
).task
code_pkg = step.task.code
# We use parameters_task to allow the creation of a environment for a task that
# may not have fully run.
mf_version = parameters_task.metadata_dict.get("metaflow_version", "")
mf_extensions_info = parameters_task["_graph_info"].data.get("extensions")
else:
if pathspec:
raise click.BadOptionUsage(
"--pathspec not used but environment name is a pathspec"
"pathspec", "missing --pathspec; environment name is a pathspec"
)

env_id_for_alias = cast(Conda, obj.conda).env_id_from_alias(
Expand All @@ -339,61 +353,66 @@ def create(
# We need to install ipykernel into the resolved environment
obj.echo(" Resolving an environment compatible with Jupyter ...", nl=False)

# We use envsresolver to properly deal with builder environments and what not
resolver = EnvsResolver(obj.conda)
# We force the env_type to be the same as the base env since we don't modify that
# by adding these deps.

# We also force the use of use_latest because we are not really doing anything
# that would require a re-resolve (ie: the user doesn't really care about the
# version of ipykernel most likely).
resolver.add_environment(
arch_id(),
user_deps={
"pypi" if env.env_type == EnvType.PYPI_ONLY else "conda": ["ipykernel"]
},
user_sources={},
extras={},
base_env=env,
local_only=local_only,
use_latest=":any:",
)
resolver.resolve_environments(obj.echo)
update_envs = [] # type: List[ResolvedEnvironment]
if obj.datastore_type != "local" or CONDA_TEST:
# We may need to update caches
# Note that it is possible that something we needed to resolve, we don't need
# to cache (if we resolved to something already cached).
formats = set() # type: Set[str]
for _, resolved_env, f, _ in resolver.need_caching_environments(
include_builder_envs=True
):
update_envs.append(resolved_env)
formats.update(f)

cast(Conda, obj.conda).cache_environments(
update_envs, {"conda": list(formats)}
# We first check if `ipykernel` already exists in the environment. If it does, we
# can skip the whole resolution process.
if not any("ipykernel" == p.package_name for p in env.packages):
# We use envsresolver to properly deal with builder environments and what not
resolver = EnvsResolver(obj.conda)
# We force the env_type to be the same as the base env since we don't modify
# that by adding these deps.

# We also force the use of use_latest because we are not really doing
# anything that would require a re-resolve (ie: the user doesn't really
# care about the version of ipykernel most likely).
resolver.add_environment(
arch_id(),
user_deps={
"pypi" if env.env_type == EnvType.PYPI_ONLY else "conda": [
"ipykernel"
]
},
user_sources={},
extras={},
base_env=env,
local_only=local_only,
use_latest=":any:",
)
else:
update_envs = [
resolved_env
for _, resolved_env, _ in resolver.new_environments(
resolver.resolve_environments(obj.echo)
update_envs = [] # type: List[ResolvedEnvironment]
if obj.datastore_type != "local" or CONDA_TEST:
# We may need to update caches
# Note that it is possible that something we needed to resolve, we don't need
# to cache (if we resolved to something already cached).
formats = set() # type: Set[str]
for _, resolved_env, f, _ in resolver.need_caching_environments(
include_builder_envs=True
):
update_envs.append(resolved_env)
formats.update(f)

cast(Conda, obj.conda).cache_environments(
update_envs, {"conda": list(formats)}
)
]
cast(Conda, obj.conda).add_environments(update_envs)
else:
update_envs = [
resolved_env
for _, resolved_env, _ in resolver.new_environments(
include_builder_envs=True
)
]
cast(Conda, obj.conda).add_environments(update_envs)

# Update the default environment
for _, resolved_env, _ in resolver.resolved_environments(
include_builder_envs=True
):
cast(Conda, obj.conda).set_default_environment(resolved_env.env_id)
# Update the default environment
for _, resolved_env, _ in resolver.resolved_environments(
include_builder_envs=True
):
cast(Conda, obj.conda).set_default_environment(resolved_env.env_id)

cast(Conda, obj.conda).write_out_environments()
cast(Conda, obj.conda).write_out_environments()

# We are going to be creating this new environment going forward (not the
# initial env we got)
_, env, _ = next(resolver.resolved_environments())
# We are going to be creating this new environment going forward (not the
# initial env we got)
_, env, _ = next(resolver.resolved_environments())

delta_time = int(time.time() - start)
obj.echo(" done in %d second%s." % (delta_time, plural_marker(delta_time)))
Expand Down Expand Up @@ -422,10 +441,12 @@ def create(
"Step '%s' does not have a code package -- "
"downloading active Metaflow version only" % env_name
)
download_mf_version("./__conda_python", mf_version)
download_mf_version(
"./__conda_python", mf_version, mf_extensions_info, obj.echo, strict
)
obj.echo(
"Code package for %s downloaded into '%s' -- `__conda_python` is "
"the executable to use" % (env_name, into_dir)
"Python executable `__conda_python` for environment '%s' downloaded "
"into '%s'" % (env_name, into_dir)
)
else:
python_bin = os.path.join(obj.conda.create_for_name(name, env), "bin", "python")
Expand Down Expand Up @@ -498,8 +519,9 @@ def create(
f.write("\n")
else:
obj.echo(
"Created environment '%s' locally, activate with `%s activate %s`"
% (name, obj.conda.binary("conda"), name)
"Conda environment '%s' created locally, activate with "
"`CONDA_ENVS_DIRS=%s %s activate %s`"
% (name, obj.conda.root_env_dir, CONDA_DEPENDENCY_RESOLVER, name)
)
cast(Conda, obj.conda).write_out_environments()

Expand Down
Loading
Loading