Skip to content

Commit

Permalink
chore(sdk): Add inputUri and outputUri placeholders in v1 (kubeflow#4913
Browse files Browse the repository at this point in the history
)

* SDK - Components - Added support for inputUri and outputUri placeholders

* remove InputUriSpec and OutputUriSpec

* Address review comments

Co-authored-by: Alexey Volkov <[email protected]>
  • Loading branch information
chensun and Ark-kun authored Dec 18, 2020
1 parent 5445ce8 commit 88dbfda
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 9 deletions.
47 changes: 43 additions & 4 deletions sdk/python/kfp/components/_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import copy
import sys
from collections import OrderedDict
from typing import Any, List, Mapping, NamedTuple, Sequence, Union
from typing import Any, Callable, List, Mapping, NamedTuple, Sequence, Union
from ._naming import _sanitize_file_name, _sanitize_python_function_name, generate_unique_name_conversion_table
from ._yaml_utils import load_yaml
from .structures import *
Expand Down Expand Up @@ -381,16 +381,24 @@ def component_default_to_func_default(component_default: str, is_optional: bool)
('input_paths', Mapping[str, str]),
('output_paths', Mapping[str, str]),
('inputs_consumed_by_value', Mapping[str, str]),
('input_uris', Mapping[str, str]),
('output_uris', Mapping[str, str]),
],
)


def _not_implemented(name: str) -> str:
raise NotImplementedError


def _resolve_command_line_and_paths(
component_spec: ComponentSpec,
arguments: Mapping[str, str],
input_path_generator=_generate_input_file_name,
output_path_generator=_generate_output_file_name,
argument_serializer=serialize_value,
input_path_generator: Callable[[str], str] = _generate_input_file_name,
output_path_generator: Callable[[str], str] = _generate_output_file_name,
argument_serializer: Callable[[str], str] = serialize_value,
input_uri_generator: Callable[[str], str] = _not_implemented,
output_uri_generator: Callable[[str], str] = _not_implemented,
) -> _ResolvedCommandLineAndPaths:
"""Resolves the command line argument placeholders. Also produces the maps of the generated inpuit/output paths."""
argument_values = arguments
Expand All @@ -409,6 +417,8 @@ def _resolve_command_line_and_paths(

input_paths = OrderedDict()
inputs_consumed_by_value = {}
input_uris = OrderedDict()
output_uris = OrderedDict()

def expand_command_part(arg) -> Union[str, List[str], None]:
if arg is None:
Expand Down Expand Up @@ -457,6 +467,33 @@ def expand_command_part(arg) -> Union[str, List[str], None]:

return output_filename

elif isinstance(arg, InputUriPlaceholder):
input_name = arg.input_name
input_argument = argument_values.get(input_name, None)
if input_name in argument_values:
input_uri = input_uri_generator(input_name)
input_uris[input_name] = input_uri
return input_uri
else:
input_spec = inputs_dict[input_name]
if input_spec.optional:
return None
else:
raise ValueError('No value provided for input {}'.format(input_name))

elif isinstance(arg, OutputUriPlaceholder):
output_name = arg.output_name
output_uri = output_uri_generator(output_name)
if arg.output_name in output_uris:
if output_uris[output_name] != output_uri:
raise ValueError(
'Conflicting output URIs specified for port {}: {} and {}'.format(
output_name, output_uris[output_name], output_uri))
else:
output_uris[output_name] = output_uri

return output_uri

elif isinstance(arg, ConcatPlaceholder):
expanded_argument_strings = expand_argument_list(arg.items)
return ''.join(expanded_argument_strings)
Expand Down Expand Up @@ -502,6 +539,8 @@ def expand_argument_list(argument_list):
input_paths=input_paths,
output_paths=output_paths,
inputs_consumed_by_value=inputs_consumed_by_value,
input_uris=input_uris,
output_uris=output_uris,
)


Expand Down
50 changes: 45 additions & 5 deletions sdk/python/kfp/components/_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
'InputValuePlaceholder',
'InputPathPlaceholder',
'OutputPathPlaceholder',
'InputUriPlaceholder',
'OutputUriPlaceholder',
'ConcatPlaceholder',
'IsPresentPlaceholder',
'IfPlaceholderStructure',
Expand Down Expand Up @@ -130,11 +132,46 @@ def __init__(self,
super().__init__(locals())


class InputUriPlaceholder(ModelBase): # Non-standard attr names
"""Represents a placeholder for the URI of an input artifact.
Represents the command-line argument placeholder that will be replaced at
run-time by the URI of the input artifact argument.
"""
_serialized_names = {
'input_name': 'inputUri',
}

def __init__(self,
input_name: str,
):
super().__init__(locals())


class OutputUriPlaceholder(ModelBase): # Non-standard attr names
"""Represents a placeholder for the URI of an output artifact.
Represents the command-line argument placeholder that will be replaced at
run-time by a URI of the output artifac where the program should write its
output data.
"""
_serialized_names = {
'output_name': 'outputUri',
}

def __init__(self,
output_name: str,
):
super().__init__(locals())


CommandlineArgumentType = Union[
str,
InputValuePlaceholder,
InputPathPlaceholder,
OutputPathPlaceholder,
InputUriPlaceholder,
OutputUriPlaceholder,
'ConcatPlaceholder',
'IfPlaceholder',
]
Expand Down Expand Up @@ -279,12 +316,15 @@ def verify_arg(arg):
elif isinstance(arg, list):
for arg2 in arg:
verify_arg(arg2)
elif isinstance(arg, (InputValuePlaceholder, InputPathPlaceholder, IsPresentPlaceholder)):
elif isinstance(arg, (InputUriPlaceholder, InputValuePlaceholder,
InputPathPlaceholder, IsPresentPlaceholder)):
if arg.input_name not in self._inputs_dict:
raise TypeError('Argument "{}" references non-existing input.'.format(arg))
elif isinstance(arg, OutputPathPlaceholder):
raise TypeError(
'Argument "{}" references non-existing input.'.format(arg))
elif isinstance(arg, (OutputUriPlaceholder, OutputPathPlaceholder)):
if arg.output_name not in self._outputs_dict:
raise TypeError('Argument "{}" references non-existing output.'.format(arg))
raise TypeError(
'Argument "{}" references non-existing output.'.format(arg))
elif isinstance(arg, ConcatPlaceholder):
for arg2 in arg.items:
verify_arg(arg2)
Expand All @@ -294,7 +334,7 @@ def verify_arg(arg):
verify_arg(arg.if_structure.else_value)
else:
raise TypeError('Unexpected argument "{}"'.format(arg))

verify_arg(container.command)
verify_arg(container.args)

Expand Down
37 changes: 37 additions & 0 deletions sdk/python/kfp/components_tests/test_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,43 @@ def test_prevent_passing_unserializable_objects_as_argument(self):
with self.assertRaises(TypeError):
component(input_1="value 1", input_2=open)

def test_input_output_uri_resolving(self):
component_text = textwrap.dedent('''\
inputs:
- {name: In1}
outputs:
- {name: Out1}
implementation:
container:
image: busybox
command:
- program
- --in1-uri
- {inputUri: In1}
- --out1-uri
- {outputUri: Out1}
'''
)
op = comp.load_component_from_text(text=component_text)
task = op(in1='foo')
resolved_cmd = _resolve_command_line_and_paths(
component_spec=task.component_ref.spec,
arguments=task.arguments,
input_uri_generator=lambda name: f"{{{{inputs[{name}].uri}}}}",
output_uri_generator=lambda name: f"{{{{outputs[{name}].uri}}}}",
)

self.assertEqual(
resolved_cmd.command,
[
'program',
'--in1-uri',
'{{inputs[In1].uri}}',
'--out1-uri',
'{{outputs[Out1].uri}}',
]
)

def test_check_type_validation_of_task_spec_outputs(self):
producer_component_text = '''\
outputs:
Expand Down

0 comments on commit 88dbfda

Please sign in to comment.