Skip to content

Commit

Permalink
Fix issues with configs and Runner (#2234)
Browse files Browse the repository at this point in the history
- Rename config_file to config to make it consistent with the Click API
 - Allow CustomFlowDecorators to work with Runner (currently, it was not finding the FlowSpec)
 - Remove hidden options from Click API
 - Small other fixes to get Runner to work with various combinations of CustomFlowDecorators and
   CustomStepDecorators
  • Loading branch information
romain-intel authored Feb 5, 2025
1 parent afd0910 commit 85da991
Show file tree
Hide file tree
Showing 22 changed files with 209 additions and 32 deletions.
6 changes: 3 additions & 3 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def start(
event_logger=None,
monitor=None,
local_config_file=None,
config_file=None,
config=None,
config_value=None,
**deco_options
):
Expand Down Expand Up @@ -383,7 +383,7 @@ def start(
# When we process the options, the first one processed will return None and the
# second one processed will return the actual options. The order of processing
# depends on what (and in what order) the user specifies on the command line.
config_options = config_file or config_value
config_options = config or config_value

if (
hasattr(ctx, "saved_args")
Expand All @@ -396,7 +396,7 @@ def start(
# if we need to in the first place
if getattr(ctx.obj, "has_cl_config_options", False):
raise click.UsageError(
"Cannot specify --config-file or --config-value with 'resume'"
"Cannot specify --config or --config-value with 'resume'"
)
# We now load the config artifacts from the original run id
run_id = None
Expand Down
4 changes: 2 additions & 2 deletions metaflow/cli_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ def _options(mapping):
# keyword in Python, so we call it 'decospecs' in click args
if k == "decospecs":
k = "with"
if k in ("config_file", "config_value"):
if k in ("config", "config_value"):
# Special handling here since we gather them all in one option but actually
# need to send them one at a time using --config-value <name> kv.<name>.
# Note it can be either config_file or config_value depending
# Note it can be either config or config_value depending
# on click processing order.
for config_name in v.keys():
yield "--config-value"
Expand Down
16 changes: 11 additions & 5 deletions metaflow/flowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,18 @@ def _check_parameters(cls, config_parameters=False):
seen.add(norm)

@classmethod
def _process_config_decorators(cls, config_options, ignore_errors=False):
def _process_config_decorators(cls, config_options, process_configs=True):

# Fast path for no user configurations
if not cls._flow_state.get(_FlowState.CONFIG_DECORATORS) and all(
len(step.config_decorators) == 0 for step in cls._steps
if not process_configs or (
not cls._flow_state.get(_FlowState.CONFIG_DECORATORS)
and all(len(step.config_decorators) == 0 for step in cls._steps)
):
# Process parameters to allow them to also use config values easily
for var, param in cls._get_parameters():
if param.IS_CONFIG_PARAMETER:
continue
param.init(ignore_errors)
param.init(not process_configs)
return None

debug.userconf_exec("Processing mutating step/flow decorators")
Expand All @@ -258,6 +259,11 @@ def _process_config_decorators(cls, config_options, ignore_errors=False):
debug.userconf_exec("Setting config %s to %s" % (var, str(val)))
setattr(cls, var, val)

# Reset cached parameters since we have replaced configs already with ConfigValue
# so they are not parameters anymore to be re-evaluated when we do _get_parameters
if _FlowState.CACHED_PARAMETERS in cls._flow_state:
del cls._flow_state[_FlowState.CACHED_PARAMETERS]

# Run all the decorators. Step decorators are directly in the step and
# we will run those first and *then* we run all the flow level decorators
for step in cls._steps:
Expand All @@ -277,7 +283,7 @@ def _process_config_decorators(cls, config_options, ignore_errors=False):
setattr(cls, step.name, step)

mutable_flow = MutableFlow(cls)
for deco in cls._flow_state[_FlowState.CONFIG_DECORATORS]:
for deco in cls._flow_state.get(_FlowState.CONFIG_DECORATORS, []):
if isinstance(deco, CustomFlowDecorator):
# Sanity check to make sure we are applying the decorator to the right
# class
Expand Down
21 changes: 16 additions & 5 deletions metaflow/runner/click_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from metaflow.includefile import FilePathClass
from metaflow.metaflow_config import CLICK_API_PROCESS_CONFIG
from metaflow.parameters import JSONTypeClass, flow_context
from metaflow.user_configs.config_decorators import CustomFlowDecorator
from metaflow.user_configs.config_options import (
ConfigValue,
ConvertDictOrStr,
Expand Down Expand Up @@ -252,10 +253,14 @@ def extract_flow_class_from_file(flow_file: str) -> FlowSpec:
# Cache the loaded module
loaded_modules[flow_file] = module

classes = inspect.getmembers(module, inspect.isclass)
classes = inspect.getmembers(
module, lambda x: inspect.isclass(x) or isinstance(x, CustomFlowDecorator)
)
flow_cls = None

for _, kls in classes:
if isinstance(kls, CustomFlowDecorator):
kls = kls._flow_cls
if (
kls is not FlowSpec
and kls.__module__ == module_name
Expand Down Expand Up @@ -444,10 +449,10 @@ def _compute_flow_parameters(self):
ds = opts.get("datastore", defaults["datastore"])
quiet = opts.get("quiet", defaults["quiet"])
is_default = False
config_file = opts.get("config-file")
config_file = opts.get("config")
if config_file is None:
is_default = True
config_file = defaults.get("config_file")
config_file = defaults.get("config")

if config_file:
config_file = map(
Expand Down Expand Up @@ -480,7 +485,7 @@ def _compute_flow_parameters(self):
# Process both configurations; the second one will return all the merged
# configuration options properly processed.
self._config_input.process_configs(
self._flow_cls.__name__, "config_file", config_file, quiet, ds
self._flow_cls.__name__, "config", config_file, quiet, ds
)
config_options = self._config_input.process_configs(
self._flow_cls.__name__, "config_value", config_value, quiet, ds
Expand All @@ -493,7 +498,7 @@ def _compute_flow_parameters(self):
# it will init all parameters (config_options will be None)
# We ignore any errors if we don't check the configs in the click API.
new_cls = self._flow_cls._process_config_decorators(
config_options, ignore_errors=not CLICK_API_PROCESS_CONFIG
config_options, process_configs=CLICK_API_PROCESS_CONFIG
)
if new_cls:
self._flow_cls = new_cls
Expand Down Expand Up @@ -522,6 +527,12 @@ def extract_all_params(cmd_obj: Union[click.Command, click.Group]):
)
arg_parameters[each_param.name] = each_param
elif isinstance(each_param, click.Option):
if each_param.hidden:
# Skip hidden options because users should not be setting those.
# These are typically internal only options (used by the Runner in part
# for example to pass state files or configs to pass local-config-file).
continue

opt_params_sigs[each_param.name], annotations[each_param.name] = (
get_inspect_param_obj(each_param, inspect.Parameter.KEYWORD_ONLY)
)
Expand Down
33 changes: 19 additions & 14 deletions metaflow/user_configs/config_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,19 +406,6 @@ def __init__(self, *args, **kwargs):
self._args = args
self._kwargs = kwargs

def __get__(self, instance, owner):
# Required so that we "present" as a FlowSpec when the flow decorator is
# of the form
# @MyFlowDecorator
# class MyFlow(FlowSpec):
# pass
#
# In that case, if we don't have __get__, the object is a CustomFlowDecorator
# and not a FlowSpec. This is more critical for steps (and CustomStepDecorator)
# because other parts of the code rely on steps having is_step. There are
# other ways to solve this but this allowed for minimal changes going forward.
return self()

def __call__(
self, flow_spec: Optional["metaflow.flowspec.FlowSpecMeta"] = None
) -> "metaflow.flowspec.FlowSpecMeta":
Expand Down Expand Up @@ -447,6 +434,15 @@ def __call__(
raise MetaflowException(
"A CustomFlowDecorator can only be applied to a FlowSpec"
)
# NOTA: This returns self._flow_cls() because the object in the case of
# @FlowDecorator
# class MyFlow(FlowSpec):
# pass
# the object is a FlowDecorator and when the main function calls it, we end up
# here and need to actually call the FlowSpec. This is not the case when using
# a decorator with arguments because in the line above, we will have returned a
# FlowSpec object. Previous solution was to use __get__ but this does not seem
# to work properly.
return self._flow_cls()

def _set_flow_cls(
Expand Down Expand Up @@ -500,7 +496,16 @@ def __init__(self, *args, **kwargs):
self._kwargs = kwargs

def __get__(self, instance, owner):
# See explanation in CustomFlowDecorator.__get__
# Required so that we "present" as a step when the step decorator is
# of the form
# @MyStepDecorator
# @step
# def my_step(self):
# pass
#
# In that case, if we don't have __get__, the object is a CustomStepDecorator
# and not a step. Other parts of the code rely on steps having is_step. There are
# other ways to solve this but this allowed for minimal changes going forward.
return self()

def __call__(
Expand Down
2 changes: 1 addition & 1 deletion metaflow/user_configs/config_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ def config_options_with_config_input(cmd):
cmd.params.insert(
0,
click.Option(
["--config", "config_file"],
["--config", "config"],
nargs=2,
multiple=True,
type=MultipleTuple([click.Choice(config_seen), ConvertPath()]),
Expand Down
4 changes: 2 additions & 2 deletions metaflow/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,10 @@ def dict_to_cli_options(params):
# keyword in Python, so we call it 'decospecs' in click args
if k == "decospecs":
k = "with"
if k in ("config_file", "config_value"):
if k in ("config", "config_value"):
# Special handling here since we gather them all in one option but actually
# need to send them one at a time using --config-value <name> kv.<name>
# Note it can be either config_file or config_value depending
# Note it can be either config or config_value depending
# on click processing order.
for config_name in v.keys():
yield "--config-value"
Expand Down
1 change: 1 addition & 0 deletions test/test_config/basic_config_silly.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
baz:amazing
21 changes: 21 additions & 0 deletions test/test_config/card_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import time
from metaflow import FlowSpec, step, Config, card


class CardConfigFlow(FlowSpec):

config = Config("config", default_value="")

@card(type=config.type)
@step
def start(self):
print("card type", self.config.type)
self.next(self.end)

@step
def end(self):
print("full config", self.config)


if __name__ == "__main__":
CardConfigFlow()
File renamed without changes.
30 changes: 30 additions & 0 deletions test/test_config/config_card.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import time
from metaflow import FlowSpec, step, card, current, Config, Parameter, config_expr
from metaflow.cards import Image

BASE = "https://picsum.photos/id"


class ConfigurablePhotoFlow(FlowSpec):
cfg = Config("config", default="photo_config.json")
id = Parameter("id", default=cfg.id, type=int)
size = Parameter("size", default=cfg.size, type=int)

@card
@step
def start(self):
import requests

params = {k: v for k, v in self.cfg.style.items() if v}
self.url = f"{BASE}/{self.id}/{self.size}/{self.size}"
img = requests.get(self.url, params)
current.card.append(Image(img.content))
self.next(self.end)

@step
def end(self):
pass


if __name__ == "__main__":
ConfigurablePhotoFlow()
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
60 changes: 60 additions & 0 deletions test/test_config/config_simple2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json
import os

from metaflow import (
Config,
FlowSpec,
Parameter,
config_expr,
current,
environment,
project,
step,
timeout,
)

default_config = {"blur": 123, "timeout": 10}


def myparser(s: str):
return {"hi": "you"}


class ConfigSimple(FlowSpec):

cfg = Config("cfg", default_value=default_config)
cfg_req = Config("cfg_req2", required=True)
blur = Parameter("blur", default=cfg.blur)
blur2 = Parameter("blur2", default=cfg_req.blur)
cfg_non_req = Config("cfg_non_req")
cfg_empty_default = Config("cfg_empty_default", default_value={})
cfg_empty_default_parser = Config(
"cfg_empty_default_parser", default_value="", parser=myparser
)
cfg_non_req_parser = Config("cfg_non_req_parser", parser=myparser)

@timeout(seconds=cfg["timeout"])
@step
def start(self):
print(
"Non req: %s; emtpy_default %s; empty_default_parser: %s, non_req_parser: %s"
% (
self.cfg_non_req,
self.cfg_empty_default,
self.cfg_empty_default_parser,
self.cfg_non_req_parser,
)
)
print("Blur is %s" % self.blur)
print("Blur2 is %s" % self.blur2)
print("Config is of type %s" % type(self.cfg))
self.next(self.end)

@step
def end(self):
print("Blur is %s" % self.blur)
print("Blur2 is %s" % self.blur2)


if __name__ == "__main__":
ConfigSimple()
File renamed without changes.
File renamed without changes.
18 changes: 18 additions & 0 deletions test/test_config/no_default.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from metaflow import Config, FlowSpec, card, step


class Sample(FlowSpec):
config = Config("config", default=None)

@card
@step
def start(self):
self.next(self.end)

@step
def end(self):
pass


if __name__ == "__main__":
Sample()
8 changes: 8 additions & 0 deletions test/test_config/photo_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": 1084,
"size": 400,
"style": {
"grayscale": true,
"blur": 5
}
}
17 changes: 17 additions & 0 deletions test/test_config/runner_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from metaflow import FlowSpec, Runner, step


class RunnerFlow(FlowSpec):
@step
def start(self):
with Runner("./mutable_flow.py") as r:
r.run()
self.next(self.end)

@step
def end(self):
print("Done")


if __name__ == "__main__":
RunnerFlow()
File renamed without changes.

0 comments on commit 85da991

Please sign in to comment.