Skip to content

Commit

Permalink
Modify getting Metaflow version (Netflix#2009)
Browse files Browse the repository at this point in the history
* Modify getting Metaflow version

Favors reading the INFO file if present to be able to have the
most accurate version of Metaflow when executing remotely (especially in
the presence of extensions).

Also limit reading the INFO file to once per process (as opposed to possibly
twice).

Finally, gets the version of the source of Metaflow (and not the current
directory)

* Addressed comments

* Add support for overriding version string

* Fix issue with brew (hopefully)

* Revert "Add support for overriding version string"

This reverts commit 0434e97.

* Add installed_extensions information (Netflix#2018)

* Add installed_extensions information

This adds information to the system tags about the metaflow extensions that
are installed. This makes it easier to reproduce the exact same
environment. Information about whether the package information is complete or
not is also included.

This PR also fixes some issues with extension loading and allows extension
information to be gathered in a programatic fashion thereby enabling the
discovery of extensions.

* Allow users to configure where extensions are searched

* Move EXTENSIONS_SEARCH_DIRS out of config

* Move the extension installation information to _graph_info

* Use .metadata[X] for name and version since not all importlibs have the aliases
  • Loading branch information
romain-intel authored Sep 16, 2024
1 parent a7bbc57 commit a85ee20
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 127 deletions.
32 changes: 11 additions & 21 deletions metaflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,8 @@ class and related decorators.
Metaflow GitHub page.
"""

import importlib
import os
import sys
import types

from os import path

CURRENT_DIRECTORY = path.dirname(path.abspath(__file__))
INFO_FILE = path.join(path.dirname(CURRENT_DIRECTORY), "INFO")

from metaflow.extension_support import (
alias_submodules,
Expand All @@ -61,7 +55,6 @@ class and related decorators.
_ext_debug,
)


# We load the module overrides *first* explicitly. Non overrides can be loaded
# in toplevel as well but these can be loaded first if needed. Note that those
# modules should be careful not to include anything in Metaflow at their top-level
Expand All @@ -79,9 +72,14 @@ class and related decorators.
)
tl_module = m.module.__dict__.get("toplevel", None)
if tl_module is not None:
_tl_modules.append(".".join([EXT_PKG, m.tl_package, "toplevel", tl_module]))
_tl_modules.append(
(
m.package_name,
".".join([EXT_PKG, m.tl_package, "toplevel", tl_module]),
)
)
_ext_debug("Got overrides to load: %s" % _override_modules)
_ext_debug("Got top-level imports: %s" % _tl_modules)
_ext_debug("Got top-level imports: %s" % str(_tl_modules))
except Exception as e:
_ext_debug("Error in importing toplevel/overrides: %s" % e)

Expand Down Expand Up @@ -153,25 +151,17 @@ class and related decorators.
from .runner.deployer import Deployer
from .runner.nbdeploy import NBDeployer

__version_addl__ = []
__ext_tl_modules__ = []
_ext_debug("Loading top-level modules")
for m in _tl_modules:
for pkg_name, m in _tl_modules:
extension_module = load_module(m)
if extension_module:
tl_package = m.split(".")[1]
load_globals(extension_module, globals(), extra_indent=True)
lazy_load_aliases(
alias_submodules(extension_module, tl_package, None, extra_indent=True)
)
version_info = getattr(extension_module, "__mf_extensions__", "<unk>")
if extension_module.__version__:
version_info = "%s(%s)" % (version_info, extension_module.__version__)
__version_addl__.append(version_info)

if __version_addl__:
__version_addl__ = ";".join(__version_addl__)
else:
__version_addl__ = None
__ext_tl_modules__.append((pkg_name, extension_module))

# Erase all temporary names to avoid leaking things
for _n in [
Expand Down
2 changes: 1 addition & 1 deletion metaflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from metaflow.unbounded_foreach import CONTROL_TASK_TAG
from metaflow.util import cached_property, is_stringish, resolve_identity, to_unicode

from .. import INFO_FILE
from ..info_file import INFO_FILE
from .filecache import FileCache

try:
Expand Down
5 changes: 3 additions & 2 deletions metaflow/cmd/main_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ def start(ctx):

import metaflow

version = get_version()
echo("Metaflow ", fg="magenta", bold=True, nl=False)

if ctx.invoked_subcommand is None:
echo("(%s): " % get_version(), fg="magenta", bold=False, nl=False)
echo("(%s): " % version, fg="magenta", bold=False, nl=False)
else:
echo("(%s)\n" % get_version(), fg="magenta", bold=False)
echo("(%s)\n" % version, fg="magenta", bold=False)

if ctx.invoked_subcommand is None:
echo("More data science, less engineering\n", fg="magenta")
Expand Down
149 changes: 120 additions & 29 deletions metaflow/extension_support/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import print_function

import importlib
import json
import os
import re
import sys
Expand All @@ -11,6 +10,10 @@

from importlib.abc import MetaPathFinder, Loader
from itertools import chain
from pathlib import Path

from metaflow.info_file import read_info_file


#
# This file provides the support for Metaflow's extension mechanism which allows
Expand Down Expand Up @@ -59,6 +62,9 @@
"load_module",
"get_modules",
"dump_module_info",
"get_extensions_in_dir",
"extension_info",
"update_package_info",
"get_aliased_modules",
"package_mfext_package",
"package_mfext_all",
Expand All @@ -80,9 +86,14 @@
# To get verbose messages, set METAFLOW_DEBUG_EXT to 1
DEBUG_EXT = os.environ.get("METAFLOW_DEBUG_EXT", False)

# This is extracted only from environment variable and here separately from
# metaflow_config to prevent nasty circular dependencies
EXTENSIONS_SEARCH_DIRS = os.environ.get("METAFLOW_EXTENSIONS_SEARCH_DIRS", "").split(
os.pathsep
)

MFExtPackage = namedtuple("MFExtPackage", "package_name tl_package config_module")
MFExtModule = namedtuple("MFExtModule", "tl_package module")
MFExtModule = namedtuple("MFExtModule", "package_name tl_package module")


def load_module(module_name):
Expand Down Expand Up @@ -113,17 +124,64 @@ def get_modules(extension_point):
return modules_to_load


def dump_module_info():
_filter_files_all()
def dump_module_info(all_packages=None, pkgs_per_extension_point=None):
if all_packages is None:
all_packages = _all_packages
if pkgs_per_extension_point is None:
pkgs_per_extension_point = _pkgs_per_extension_point

_filter_files_all(all_packages)
sanitized_all_packages = dict()
# Strip out root_paths (we don't need it and no need to expose user's dir structure)
for k, v in _all_packages.items():
for k, v in all_packages.items():
sanitized_all_packages[k] = {
"root_paths": None,
"meta_module": v["meta_module"],
"files": v["files"],
"version": v["version"],
"reported_version": v.get("reported_version", "<unk>"),
"user_visible_name": v.get("user_visible_name", "<unk>"),
}
return "ext_info", [sanitized_all_packages, _pkgs_per_extension_point]
return "ext_info", [sanitized_all_packages, pkgs_per_extension_point]


def get_extensions_in_dir(d):
if not _mfext_supported:
_ext_debug("Not supported for your Python version -- 3.4+ is needed")
return None, None
return _get_extension_packages(ignore_info_file=True, restrict_to_directories=[d])


def extension_info(packages=None):
if packages is None:
packages = _all_packages
# Returns information about installed extensions so it it can be stored in
# _graph_info.
return {
"installed": {
k: {
"version": v["version"],
"reported_version": v.get("reported_version", "<unk>"),
"user_visible_name": v.get("user_visible_name", "<unk>"),
}
for k, v in packages.items()
},
}


def update_package_info(pkg_to_update=None, package_name=None, **kwargs):
pkg = None
if pkg_to_update:
pkg = pkg_to_update
elif package_name:
pkg = _all_packages.get(package_name)
for k, v in kwargs.items():
if k in pkg:
raise ValueError(
"Trying to overwrite existing key '%s' for package %s" % (k, str(pkg))
)
pkg[k] = v
return pkg


def get_aliased_modules():
Expand All @@ -134,8 +192,8 @@ def package_mfext_package(package_name):
from metaflow.util import to_unicode

_ext_debug("Packaging '%s'" % package_name)
_filter_files_package(package_name)
pkg_info = _all_packages.get(package_name, None)
_filter_files_package(pkg_info)
if pkg_info and pkg_info.get("root_paths", None):
single_path = len(pkg_info["root_paths"]) == 1
for p in pkg_info["root_paths"]:
Expand Down Expand Up @@ -296,28 +354,28 @@ def _ext_debug(*args, **kwargs):
print(init_str, *args, **kwargs)


def _get_extension_packages():
def _get_extension_packages(ignore_info_file=False, restrict_to_directories=None):
if not _mfext_supported:
_ext_debug("Not supported for your Python version -- 3.4+ is needed")
return {}, {}

# If we have an INFO file with the appropriate information (if running from a saved
# code package for example), we use that directly
# Pre-compute on _extension_points
from metaflow import INFO_FILE

try:
with open(INFO_FILE, encoding="utf-8") as contents:
all_pkg, ext_to_pkg = json.load(contents).get("ext_info", (None, None))
if all_pkg is not None and ext_to_pkg is not None:
_ext_debug("Loading pre-computed information from INFO file")
# We need to properly convert stuff in ext_to_pkg
for k, v in ext_to_pkg.items():
v = [MFExtPackage(*d) for d in v]
ext_to_pkg[k] = v
return all_pkg, ext_to_pkg
except IOError:
pass
info_content = read_info_file()
if not ignore_info_file and info_content:
all_pkg, ext_to_pkg = info_content.get("ext_info", (None, None))
if all_pkg is not None and ext_to_pkg is not None:
_ext_debug("Loading pre-computed information from INFO file")
# We need to properly convert stuff in ext_to_pkg
for k, v in ext_to_pkg.items():
v = [MFExtPackage(*d) for d in v]
ext_to_pkg[k] = v
return all_pkg, ext_to_pkg

# Late import to prevent some circular nastiness
if restrict_to_directories is None and EXTENSIONS_SEARCH_DIRS != [""]:
restrict_to_directories = EXTENSIONS_SEARCH_DIRS

# Check if we even have extensions
try:
Expand All @@ -331,6 +389,11 @@ def _get_extension_packages():
raise
return {}, {}

if restrict_to_directories:
restrict_to_directories = [
Path(p).resolve().as_posix() for p in restrict_to_directories
]

# There are two "types" of packages:
# - those installed on the system (distributions)
# - those present in the PYTHONPATH
Expand All @@ -343,8 +406,12 @@ def _get_extension_packages():
# At this point, we look at all the paths and create a set. As we find distributions
# that match it, we will remove from the set and then will be left with any
# PYTHONPATH "packages"
all_paths = set(extensions_module.__path__)
all_paths = set(Path(p).resolve().as_posix() for p in extensions_module.__path__)
_ext_debug("Found packages present at %s" % str(all_paths))
if restrict_to_directories:
_ext_debug(
"Processed packages will be restricted to %s" % str(restrict_to_directories)
)

list_ext_points = [x.split(".") for x in _extension_points]
init_ext_points = [x[0] for x in list_ext_points]
Expand Down Expand Up @@ -391,9 +458,20 @@ def _get_extension_packages():
# This is not 100% accurate because it is possible that at the same
# location there is a package and a non-package, but this is extremely
# unlikely so we are going to ignore this case.
dist_root = dist.locate_file(EXT_PKG).as_posix()
dist_root = dist.locate_file(EXT_PKG).resolve().as_posix()
all_paths.discard(dist_root)
dist_name = dist.metadata["Name"]
dist_version = dist.metadata["Version"]
if restrict_to_directories:
parent_dirs = list(
p.as_posix() for p in Path(dist_root).resolve().parents
)
if all(p not in parent_dirs for p in restrict_to_directories):
_ext_debug(
"Ignoring package at %s as it is not in the considered directories"
% dist_root
)
continue
if dist_name in mf_ext_packages:
_ext_debug(
"Ignoring duplicate package '%s' (duplicate paths in sys.path? (%s))"
Expand Down Expand Up @@ -537,6 +615,7 @@ def _get_extension_packages():
"root_paths": [dist_root],
"meta_module": meta_module,
"files": files_to_include,
"version": dist_version,
}
# At this point, we have all the packages that contribute to EXT_PKG,
# we now check to see if there is an order to respect based on dependencies. We will
Expand Down Expand Up @@ -605,6 +684,16 @@ def _get_extension_packages():
if len(all_paths_list) > 0:
_ext_debug("Non installed packages present at %s" % str(all_paths))
for package_count, package_path in enumerate(all_paths_list):
if restrict_to_directories:
parent_dirs = list(
p.as_posix() for p in Path(package_path).resolve().parents
)
if all(p not in parent_dirs for p in restrict_to_directories):
_ext_debug(
"Ignoring non-installed package at %s as it is not in "
"the considered directories" % package_path
)
continue
# We give an alternate name for the visible package name. It is
# not exposed to the end user but used to refer to the package, and it
# doesn't provide much additional information to have the full path
Expand Down Expand Up @@ -740,6 +829,7 @@ def _get_extension_packages():
"root_paths": [package_path],
"meta_module": meta_module,
"files": files_to_include,
"version": "_local_",
}

# Sanity check that we only have one package per configuration file.
Expand Down Expand Up @@ -868,12 +958,13 @@ def _get_extension_config(distribution_name, tl_pkg, extension_point, config_mod
_ext_debug("Package '%s' is rooted at %s" % (distribution_name, root_paths))
_all_packages[distribution_name]["root_paths"] = root_paths

return MFExtModule(tl_package=tl_pkg, module=extension_module)
return MFExtModule(
package_name=distribution_name, tl_package=tl_pkg, module=extension_module
)
return None


def _filter_files_package(package_name):
pkg = _all_packages.get(package_name)
def _filter_files_package(pkg):
if pkg and pkg["root_paths"] and pkg["meta_module"]:
meta_module = _attempt_load_module(pkg["meta_module"])
if meta_module:
Expand Down Expand Up @@ -902,8 +993,8 @@ def _filter_files_package(package_name):
pkg["files"] = new_files


def _filter_files_all():
for p in _all_packages:
def _filter_files_all(all_packages):
for p in all_packages.values():
_filter_files_package(p)


Expand Down
4 changes: 4 additions & 0 deletions metaflow/flowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
MissingInMergeArtifactsException,
UnhandledInMergeArtifactsException,
)

from .extension_support import extension_info

from .graph import FlowGraph
from .unbounded_foreach import UnboundedForeachInput
from .util import to_pod
Expand Down Expand Up @@ -208,6 +211,7 @@ def _set_constants(self, graph, kwargs):
for deco in flow_decorators(self)
if not deco.name.startswith("_")
],
"extensions": extension_info(),
}
self._graph_info = graph_info

Expand Down
Loading

0 comments on commit a85ee20

Please sign in to comment.