diff --git a/development/docs/build_block_docs.py b/development/docs/build_block_docs.py index 2b64f77633..d5968849b6 100644 --- a/development/docs/build_block_docs.py +++ b/development/docs/build_block_docs.py @@ -104,7 +104,7 @@ def main() -> None: token=AUTOGENERATED_BLOCKS_LIST_TOKEN, ) block_card_lines = [] - blocks_description = describe_available_blocks() + blocks_description = describe_available_blocks(dynamic_blocks=[]) block_type2manifest_type_identifier = { block.block_class: block.manifest_type_identifier for block in blocks_description.blocks diff --git a/docker/dockerfiles/Dockerfile.onnx.lambda b/docker/dockerfiles/Dockerfile.onnx.lambda index f877b3a657..7927cb044f 100644 --- a/docker/dockerfiles/Dockerfile.onnx.lambda +++ b/docker/dockerfiles/Dockerfile.onnx.lambda @@ -70,6 +70,7 @@ ENV API_LOGGING_ENABLED=True ENV MODEL_VALIDATION_DISABLED=True ENV ALLOW_NON_HTTPS_URL_INPUT=False ENV ALLOW_URL_INPUT_WITHOUT_FQDN=False +ENV ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS=False WORKDIR ${LAMBDA_TASK_ROOT} RUN rm -rf /build diff --git a/docker/dockerfiles/Dockerfile.onnx.lambda.slim b/docker/dockerfiles/Dockerfile.onnx.lambda.slim index ccbbb5c0f7..a31efa51c7 100644 --- a/docker/dockerfiles/Dockerfile.onnx.lambda.slim +++ b/docker/dockerfiles/Dockerfile.onnx.lambda.slim @@ -64,6 +64,7 @@ ENV API_LOGGING_ENABLED=True ENV MODEL_VALIDATION_DISABLED=True ENV ALLOW_NON_HTTPS_URL_INPUT=False ENV ALLOW_URL_INPUT_WITHOUT_FQDN=False +ENV ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS=False WORKDIR ${LAMBDA_TASK_ROOT} diff --git a/docs/workflows/blocks.md b/docs/workflows/blocks.md index adc30dd5ab..bf4095caf8 100644 --- a/docs/workflows/blocks.md +++ b/docs/workflows/blocks.md @@ -36,6 +36,7 @@ hide:

+

diff --git a/docs/workflows/kinds.md b/docs/workflows/kinds.md index faeef32b67..3dbc94c961 100644 --- a/docs/workflows/kinds.md +++ b/docs/workflows/kinds.md @@ -8,26 +8,26 @@ resolved we need a simple type system - that's what we call `kinds`. ## List of `workflows` kinds -* [`roboflow_project`](/workflows/kinds/roboflow_project): Roboflow project name -* [`dictionary`](/workflows/kinds/dictionary): Dictionary -* [`string`](/workflows/kinds/string): String value +* [`list_of_values`](/workflows/kinds/list_of_values): List of values of any types +* [`*`](/workflows/kinds/*): Equivalent of any element * [`Batch[dictionary]`](/workflows/kinds/batch_dictionary): Batch of dictionaries -* [`Batch[keypoint_detection_prediction]`](/workflows/kinds/batch_keypoint_detection_prediction): `'predictions'` key from Keypoint Detection Model output -* [`Batch[parent_id]`](/workflows/kinds/batch_parent_id): Identifier of parent for step output -* [`Batch[classification_prediction]`](/workflows/kinds/batch_classification_prediction): `'predictions'` key from Classification Model outputs -* [`roboflow_model_id`](/workflows/kinds/roboflow_model_id): Roboflow model id * [`Batch[top_class]`](/workflows/kinds/batch_top_class): Batch of string values representing top class predicted by classification model +* [`integer`](/workflows/kinds/integer): Integer value +* [`dictionary`](/workflows/kinds/dictionary): Dictionary +* [`Batch[classification_prediction]`](/workflows/kinds/batch_classification_prediction): `'predictions'` key from Classification Model outputs +* [`Batch[boolean]`](/workflows/kinds/batch_boolean): Boolean values batch * [`boolean`](/workflows/kinds/boolean): Boolean flag +* [`Batch[prediction_type]`](/workflows/kinds/batch_prediction_type): String value with type of prediction +* [`Batch[parent_id]`](/workflows/kinds/batch_parent_id): Identifier of parent for step output +* [`string`](/workflows/kinds/string): String value * [`Batch[instance_segmentation_prediction]`](/workflows/kinds/batch_instance_segmentation_prediction): `'predictions'` key from Instance Segmentation Model outputs -* [`*`](/workflows/kinds/*): Equivalent of any element -* [`integer`](/workflows/kinds/integer): Integer value +* [`float_zero_to_one`](/workflows/kinds/float_zero_to_one): `float` value in range `[0.0, 1.0]` * [`Batch[image_metadata]`](/workflows/kinds/batch_image_metadata): Dictionary with image metadata required by supervision -* [`Batch[bar_code_detection]`](/workflows/kinds/batch_bar_code_detection): Prediction with barcode detection * [`Batch[image]`](/workflows/kinds/batch_image): Image in workflows +* [`roboflow_project`](/workflows/kinds/roboflow_project): Roboflow project name * [`Batch[string]`](/workflows/kinds/batch_string): Batch of string values -* [`list_of_values`](/workflows/kinds/list_of_values): List of values of any types -* [`Batch[boolean]`](/workflows/kinds/batch_boolean): Boolean values batch * [`Batch[object_detection_prediction]`](/workflows/kinds/batch_object_detection_prediction): `'predictions'` key from Object Detection Model output -* [`float_zero_to_one`](/workflows/kinds/float_zero_to_one): `float` value in range `[0.0, 1.0]` -* [`Batch[prediction_type]`](/workflows/kinds/batch_prediction_type): String value with type of prediction +* [`Batch[keypoint_detection_prediction]`](/workflows/kinds/batch_keypoint_detection_prediction): `'predictions'` key from Keypoint Detection Model output +* [`Batch[bar_code_detection]`](/workflows/kinds/batch_bar_code_detection): Prediction with barcode detection +* [`roboflow_model_id`](/workflows/kinds/roboflow_model_id): Roboflow model id diff --git a/inference/core/entities/requests/workflows.py b/inference/core/entities/requests/workflows.py index ab5aa07642..a82a1448f2 100644 --- a/inference/core/entities/requests/workflows.py +++ b/inference/core/entities/requests/workflows.py @@ -2,6 +2,10 @@ from pydantic import BaseModel, Field +from inference.core.workflows.execution_engine.dynamic_blocks.entities import ( + DynamicBlockDefinition, +) + class WorkflowInferenceRequest(BaseModel): api_key: str = Field( @@ -18,3 +22,9 @@ class WorkflowInferenceRequest(BaseModel): class WorkflowSpecificationInferenceRequest(WorkflowInferenceRequest): specification: dict + + +class DescribeBlocksRequest(BaseModel): + dynamic_blocks_definitions: List[DynamicBlockDefinition] = Field( + default_factory=list, description="Dynamic blocks to be used." + ) diff --git a/inference/core/entities/responses/workflows.py b/inference/core/entities/responses/workflows.py index 6ecaf41216..95c5b3fc33 100644 --- a/inference/core/entities/responses/workflows.py +++ b/inference/core/entities/responses/workflows.py @@ -146,3 +146,6 @@ class WorkflowsBlocksDescription(BaseModel): universal_query_language_description: UniversalQueryLanguageDescription = Field( description="Definitions of Universal Query Language operations and operators" ) + dynamic_block_definition_schema: dict = Field( + description="Schema for dynamic block definition" + ) diff --git a/inference/core/env.py b/inference/core/env.py index 84beea90fb..f0a9e9f7e5 100644 --- a/inference/core/env.py +++ b/inference/core/env.py @@ -393,6 +393,9 @@ WORKFLOWS_REMOTE_EXECUTION_MAX_STEP_CONCURRENT_REQUESTS = int( os.getenv("WORKFLOWS_REMOTE_EXECUTION_MAX_STEP_CONCURRENT_REQUESTS", "8") ) +ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS = str2bool( + os.getenv("ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS", True) +) MODEL_VALIDATION_DISABLED = str2bool(os.getenv("MODEL_VALIDATION_DISABLED", "False")) diff --git a/inference/core/interfaces/http/handlers/__init__.py b/inference/core/interfaces/http/handlers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/inference/core/interfaces/http/handlers/workflows.py b/inference/core/interfaces/http/handlers/workflows.py new file mode 100644 index 0000000000..326e1e04e5 --- /dev/null +++ b/inference/core/interfaces/http/handlers/workflows.py @@ -0,0 +1,78 @@ +# TODO - for everyone: start migrating other handlers to bring relief to http_api.py +from typing import List, Optional + +from inference.core.entities.responses.workflows import ( + ExternalBlockPropertyPrimitiveDefinition, + ExternalWorkflowsBlockSelectorDefinition, + UniversalQueryLanguageDescription, + WorkflowsBlocksDescription, +) +from inference.core.workflows.core_steps.common.query_language.introspection.core import ( + prepare_operations_descriptions, + prepare_operators_descriptions, +) +from inference.core.workflows.execution_engine.dynamic_blocks.block_assembler import ( + compile_dynamic_blocks, +) +from inference.core.workflows.execution_engine.dynamic_blocks.entities import ( + DynamicBlockDefinition, +) +from inference.core.workflows.execution_engine.introspection.blocks_loader import ( + describe_available_blocks, +) +from inference.core.workflows.execution_engine.introspection.connections_discovery import ( + discover_blocks_connections, +) + + +def handle_describe_workflows_blocks_request( + dynamic_blocks_definitions: Optional[List[DynamicBlockDefinition]] = None, +) -> WorkflowsBlocksDescription: + if dynamic_blocks_definitions is None: + dynamic_blocks_definitions = [] + dynamic_blocks = compile_dynamic_blocks( + dynamic_blocks_definitions=dynamic_blocks_definitions, + ) + blocks_description = describe_available_blocks(dynamic_blocks=dynamic_blocks) + blocks_connections = discover_blocks_connections( + blocks_description=blocks_description, + ) + kinds_connections = { + kind_name: [ + ExternalWorkflowsBlockSelectorDefinition( + manifest_type_identifier=c.manifest_type_identifier, + property_name=c.property_name, + property_description=c.property_description, + compatible_element=c.compatible_element, + is_list_element=c.is_list_element, + is_dict_element=c.is_dict_element, + ) + for c in connections + ] + for kind_name, connections in blocks_connections.kinds_connections.items() + } + primitives_connections = [ + ExternalBlockPropertyPrimitiveDefinition( + manifest_type_identifier=primitives_connection.manifest_type_identifier, + property_name=primitives_connection.property_name, + property_description=primitives_connection.property_description, + type_annotation=primitives_connection.type_annotation, + ) + for primitives_connection in blocks_connections.primitives_connections + ] + uql_operations_descriptions = prepare_operations_descriptions() + uql_operators_descriptions = prepare_operators_descriptions() + universal_query_language_description = ( + UniversalQueryLanguageDescription.from_internal_entities( + operations_descriptions=uql_operations_descriptions, + operators_descriptions=uql_operators_descriptions, + ) + ) + return WorkflowsBlocksDescription( + blocks=blocks_description.blocks, + declared_kinds=blocks_description.declared_kinds, + kinds_connections=kinds_connections, + primitives_connections=primitives_connections, + universal_query_language_description=universal_query_language_description, + dynamic_block_definition_schema=DynamicBlockDefinition.schema(), + ) diff --git a/inference/core/interfaces/http/http_api.py b/inference/core/interfaces/http/http_api.py index 2c4d6ed5fe..75c331f4a9 100644 --- a/inference/core/interfaces/http/http_api.py +++ b/inference/core/interfaces/http/http_api.py @@ -14,7 +14,6 @@ from starlette.middleware.base import BaseHTTPMiddleware from inference.core import logger -from inference.core.cache import cache from inference.core.devices.utils import GLOBAL_INFERENCE_SERVER_ID from inference.core.entities.requests.clip import ( ClipCompareRequest, @@ -43,6 +42,7 @@ ClearModelRequest, ) from inference.core.entities.requests.workflows import ( + DescribeBlocksRequest, WorkflowInferenceRequest, WorkflowSpecificationInferenceRequest, ) @@ -74,9 +74,6 @@ ServerVersionInfo, ) from inference.core.entities.responses.workflows import ( - ExternalBlockPropertyPrimitiveDefinition, - ExternalWorkflowsBlockSelectorDefinition, - UniversalQueryLanguageDescription, WorkflowInferenceResponse, WorkflowsBlocksDescription, WorkflowValidationStatus, @@ -129,6 +126,9 @@ WorkspaceLoadError, ) from inference.core.interfaces.base import BaseInterface +from inference.core.interfaces.http.handlers.workflows import ( + handle_describe_workflows_blocks_request, +) from inference.core.interfaces.http.orjson_utils import ( orjson_response, serialise_workflow_result, @@ -141,12 +141,9 @@ InvalidInputTypeError, OperationTypeNotRecognisedError, ) -from inference.core.workflows.core_steps.common.query_language.introspection.core import ( - prepare_operations_descriptions, - prepare_operators_descriptions, -) from inference.core.workflows.entities.base import OutputDefinition from inference.core.workflows.errors import ( + DynamicBlockError, ExecutionGraphStructureError, InvalidReferenceTargetError, ReferenceTypeError, @@ -158,12 +155,6 @@ parse_workflow_definition, ) from inference.core.workflows.execution_engine.core import ExecutionEngine -from inference.core.workflows.execution_engine.introspection.blocks_loader import ( - describe_available_blocks, -) -from inference.core.workflows.execution_engine.introspection.connections_discovery import ( - discover_blocks_connections, -) from inference.models.aliases import resolve_roboflow_model_alias from inference.usage_tracking.collector import usage_collector @@ -245,6 +236,7 @@ async def wrapped_route(*args, **kwargs): RuntimeInputError, InvalidInputTypeError, OperationTypeNotRecognisedError, + DynamicBlockError, ) as error: resp = JSONResponse( status_code=400, @@ -473,13 +465,10 @@ async def process_workflow_inference_request( workflow_specification: dict, background_tasks: Optional[BackgroundTasks], ) -> WorkflowInferenceResponse: - step_execution_mode = StepExecutionMode(WORKFLOWS_STEP_EXECUTION_MODE) workflow_init_parameters = { "workflows_core.model_manager": model_manager, "workflows_core.api_key": workflow_request.api_key, "workflows_core.background_tasks": background_tasks, - "workflows_core.cache": cache, - "workflows_core.step_execution_mode": step_execution_mode, } execution_engine = ExecutionEngine.init( workflow_definition=workflow_specification, @@ -904,54 +893,35 @@ async def infer_from_workflow( @app.get( "/workflows/blocks/describe", response_model=WorkflowsBlocksDescription, - summary="[EXPERIMENTAL] Endpoint to get definition of workflows blocks that are accessible", + summary="[LEGACY] Endpoint to get definition of workflows blocks that are accessible", description="Endpoint provides detailed information about workflows building blocks that are " "accessible in the inference server. This information could be used to programmatically " "build / display workflows.", + deprecated=True, ) @with_route_exceptions async def describe_workflows_blocks() -> WorkflowsBlocksDescription: - blocks_description = describe_available_blocks() - blocks_connections = discover_blocks_connections( - blocks_description=blocks_description, - ) - kinds_connections = { - kind_name: [ - ExternalWorkflowsBlockSelectorDefinition( - manifest_type_identifier=c.manifest_type_identifier, - property_name=c.property_name, - property_description=c.property_description, - compatible_element=c.compatible_element, - is_list_element=c.is_list_element, - is_dict_element=c.is_dict_element, - ) - for c in connections - ] - for kind_name, connections in blocks_connections.kinds_connections.items() - } - primitives_connections = [ - ExternalBlockPropertyPrimitiveDefinition( - manifest_type_identifier=primitives_connection.manifest_type_identifier, - property_name=primitives_connection.property_name, - property_description=primitives_connection.property_description, - type_annotation=primitives_connection.type_annotation, - ) - for primitives_connection in blocks_connections.primitives_connections - ] - uql_operations_descriptions = prepare_operations_descriptions() - uql_operators_descriptions = prepare_operators_descriptions() - universal_query_language_description = ( - UniversalQueryLanguageDescription.from_internal_entities( - operations_descriptions=uql_operations_descriptions, - operators_descriptions=uql_operators_descriptions, - ) - ) - return WorkflowsBlocksDescription( - blocks=blocks_description.blocks, - declared_kinds=blocks_description.declared_kinds, - kinds_connections=kinds_connections, - primitives_connections=primitives_connections, - universal_query_language_description=universal_query_language_description, + return handle_describe_workflows_blocks_request() + + @app.post( + "/workflows/blocks/describe", + response_model=WorkflowsBlocksDescription, + summary="[EXPERIMENTAL] Endpoint to get definition of workflows blocks that are accessible", + description="Endpoint provides detailed information about workflows building blocks that are " + "accessible in the inference server. This information could be used to programmatically " + "build / display workflows. Additionally - in request body one can specify list of " + "dynamic blocks definitions which will be transformed into blocks and used to generate " + "schemas and definitions of connections", + ) + @with_route_exceptions + async def describe_workflows_blocks( + request: Optional[DescribeBlocksRequest] = None, + ) -> WorkflowsBlocksDescription: + dynamic_blocks_definitions = None + if request is not None: + dynamic_blocks_definitions = request.dynamic_blocks_definitions + return handle_describe_workflows_blocks_request( + dynamic_blocks_definitions=dynamic_blocks_definitions ) @app.post( @@ -965,6 +935,8 @@ async def describe_workflows_blocks() -> WorkflowsBlocksDescription: async def get_dynamic_block_outputs( step_manifest: Dict[str, Any] ) -> List[OutputDefinition]: + # Potentially TODO: dynamic blocks do not support dynamic outputs, but if it changes + # we need to provide dynamic blocks manifests here dummy_workflow_definition = { "version": "1.0", "inputs": [], @@ -972,7 +944,8 @@ async def get_dynamic_block_outputs( "outputs": [], } parsed_definition = parse_workflow_definition( - raw_workflow_definition=dummy_workflow_definition + raw_workflow_definition=dummy_workflow_definition, + dynamic_blocks=[], ) parsed_manifest = parsed_definition.steps[0] return parsed_manifest.get_actual_outputs() diff --git a/inference/core/interfaces/stream/inference_pipeline.py b/inference/core/interfaces/stream/inference_pipeline.py index 4298b30bd7..6808fd57ad 100644 --- a/inference/core/interfaces/stream/inference_pipeline.py +++ b/inference/core/interfaces/stream/inference_pipeline.py @@ -556,10 +556,6 @@ def init_with_workflow( workflow_init_parameters["workflows_core.background_tasks"] = ( background_tasks ) - workflow_init_parameters["workflows_core.cache"] = cache - workflow_init_parameters["workflows_core.step_execution_mode"] = ( - StepExecutionMode.LOCAL - ) execution_engine = ExecutionEngine.init( workflow_definition=workflow_specification, init_parameters=workflow_init_parameters, diff --git a/inference/core/interfaces/stream/sinks.py b/inference/core/interfaces/stream/sinks.py index 827c737cbb..3e9affbef8 100644 --- a/inference/core/interfaces/stream/sinks.py +++ b/inference/core/interfaces/stream/sinks.py @@ -80,8 +80,8 @@ def render_boxes( (for sequential input) or position in the batch (from 0 to batch_size-1). Returns: None - Side effects: on_frame_rendered() is called against the np.ndarray produced from video frame - and predictions. + Side effects: on_frame_rendered() is called against the tuple (stream_id, np.ndarray) produced from video + frame and predictions. Example: ```python @@ -92,7 +92,11 @@ def render_boxes( output_size = (640, 480) video_sink = cv2.VideoWriter("output.avi", cv2.VideoWriter_fourcc(*"MJPG"), 25.0, output_size) - on_prediction = partial(render_boxes, display_size=output_size, on_frame_rendered=video_sink.write) + on_prediction = partial( + render_boxes, + display_size=output_size, + on_frame_rendered=lambda frame_data: video_sink.write(frame_data[1]) + ) pipeline = InferencePipeline.init( model_id="your-model/3", @@ -105,7 +109,8 @@ def render_boxes( ``` In this example, `render_boxes()` is used as a sink for `InferencePipeline` predictions - making frames with - predictions displayed to be saved into video file. + predictions displayed to be saved into video file. Please note that this is oversimplified example of usage + which will not be robust against multiple streams - better implementation available in `VideoFileSink` class. """ sequential_input_provided = False if not isinstance(video_frame, list): diff --git a/inference/core/workflows/core_steps/loader.py b/inference/core/workflows/core_steps/loader.py index 37fc716a8e..116b209839 100644 --- a/inference/core/workflows/core_steps/loader.py +++ b/inference/core/workflows/core_steps/loader.py @@ -1,5 +1,8 @@ -from typing import List, Type +from typing import Callable, List, Tuple, Type, Union +from inference.core.cache import cache +from inference.core.env import API_KEY, WORKFLOWS_STEP_EXECUTION_MODE +from inference.core.workflows.core_steps.common.entities import StepExecutionMode from inference.core.workflows.core_steps.flow_control.continue_if import ContinueIfBlock from inference.core.workflows.core_steps.formatters.expression import ExpressionBlock from inference.core.workflows.core_steps.formatters.first_non_empty_or_default import ( @@ -76,10 +79,63 @@ from inference.core.workflows.core_steps.transformations.relative_static_crop import ( RelativeStaticCropBlock, ) -from inference.core.workflows.prototypes.block import WorkflowBlock +from inference.core.workflows.entities.types import ( + BATCH_OF_BAR_CODE_DETECTION_KIND, + BATCH_OF_BOOLEAN_KIND, + BATCH_OF_CLASSIFICATION_PREDICTION_KIND, + BATCH_OF_DICTIONARY_KIND, + BATCH_OF_IMAGE_METADATA_KIND, + BATCH_OF_IMAGES_KIND, + BATCH_OF_INSTANCE_SEGMENTATION_PREDICTION_KIND, + BATCH_OF_KEYPOINT_DETECTION_PREDICTION_KIND, + BATCH_OF_OBJECT_DETECTION_PREDICTION_KIND, + BATCH_OF_PARENT_ID_KIND, + BATCH_OF_PREDICTION_TYPE_KIND, + BATCH_OF_QR_CODE_DETECTION_KIND, + BATCH_OF_SERIALISED_PAYLOADS_KIND, + BATCH_OF_STRING_KIND, + BATCH_OF_TOP_CLASS_KIND, + BOOLEAN_KIND, + DETECTION_KIND, + DICTIONARY_KIND, + FLOAT_KIND, + FLOAT_ZERO_TO_ONE_KIND, + IMAGE_KIND, + INSTANCE_SEGMENTATION_PREDICTION_KIND, + INTEGER_KIND, + KEYPOINT_DETECTION_PREDICTION_KIND, + LIST_OF_VALUES_KIND, + OBJECT_DETECTION_PREDICTION_KIND, + POINT_KIND, + ROBOFLOW_API_KEY_KIND, + ROBOFLOW_MODEL_ID_KIND, + ROBOFLOW_PROJECT_KIND, + STRING_KIND, + WILDCARD_KIND, + ZONE_KIND, + Kind, +) +from inference.core.workflows.prototypes.block import ( + WorkflowBlock, + WorkflowBlockManifest, +) + +REGISTERED_INITIALIZERS = { + "api_key": API_KEY, + "cache": cache, + "step_execution_mode": StepExecutionMode(WORKFLOWS_STEP_EXECUTION_MODE), +} -def load_blocks() -> List[Type[WorkflowBlock]]: +def load_blocks() -> List[ + Union[ + Type[WorkflowBlock], + Tuple[ + Type[WorkflowBlockManifest], + Callable[[Type[WorkflowBlockManifest]], WorkflowBlock], + ], + ] +]: return [ DetectionsConsensusBlock, ClipComparisonBlock, @@ -110,3 +166,41 @@ def load_blocks() -> List[Type[WorkflowBlock]]: DimensionCollapseBlock, FirstNonEmptyOrDefaultBlock, ] + + +def load_kinds() -> List[Kind]: + return [ + WILDCARD_KIND, + IMAGE_KIND, + BATCH_OF_IMAGES_KIND, + ROBOFLOW_MODEL_ID_KIND, + ROBOFLOW_PROJECT_KIND, + ROBOFLOW_API_KEY_KIND, + FLOAT_ZERO_TO_ONE_KIND, + LIST_OF_VALUES_KIND, + BATCH_OF_SERIALISED_PAYLOADS_KIND, + BOOLEAN_KIND, + BATCH_OF_BOOLEAN_KIND, + INTEGER_KIND, + STRING_KIND, + BATCH_OF_STRING_KIND, + BATCH_OF_TOP_CLASS_KIND, + FLOAT_KIND, + DICTIONARY_KIND, + BATCH_OF_DICTIONARY_KIND, + BATCH_OF_CLASSIFICATION_PREDICTION_KIND, + DETECTION_KIND, + POINT_KIND, + ZONE_KIND, + OBJECT_DETECTION_PREDICTION_KIND, + BATCH_OF_OBJECT_DETECTION_PREDICTION_KIND, + INSTANCE_SEGMENTATION_PREDICTION_KIND, + BATCH_OF_INSTANCE_SEGMENTATION_PREDICTION_KIND, + KEYPOINT_DETECTION_PREDICTION_KIND, + BATCH_OF_KEYPOINT_DETECTION_PREDICTION_KIND, + BATCH_OF_QR_CODE_DETECTION_KIND, + BATCH_OF_BAR_CODE_DETECTION_KIND, + BATCH_OF_PREDICTION_TYPE_KIND, + BATCH_OF_PARENT_ID_KIND, + BATCH_OF_IMAGE_METADATA_KIND, + ] diff --git a/inference/core/workflows/errors.py b/inference/core/workflows/errors.py index 1d52905100..1a7b9bda6f 100644 --- a/inference/core/workflows/errors.py +++ b/inference/core/workflows/errors.py @@ -33,6 +33,10 @@ def inner_error(self) -> Optional[Exception]: return self._inner_error +class WorkflowEnvironmentConfigurationError(WorkflowError): + pass + + class WorkflowCompilerError(WorkflowError): pass @@ -53,6 +57,10 @@ class BlockInterfaceError(WorkflowCompilerError): pass +class DynamicBlockError(WorkflowCompilerError): + pass + + class WorkflowDefinitionError(WorkflowCompilerError): pass diff --git a/inference/core/workflows/execution_engine/compiler/core.py b/inference/core/workflows/execution_engine/compiler/core.py index 54104557db..399eb4b1b0 100644 --- a/inference/core/workflows/execution_engine/compiler/core.py +++ b/inference/core/workflows/execution_engine/compiler/core.py @@ -22,6 +22,9 @@ validate_workflow_specification, ) from inference.core.workflows.execution_engine.debugger.core import dump_execution_graph +from inference.core.workflows.execution_engine.dynamic_blocks.block_assembler import ( + compile_dynamic_blocks, +) from inference.core.workflows.execution_engine.introspection.blocks_loader import ( load_initializers, load_workflow_blocks, @@ -33,10 +36,16 @@ def compile_workflow( workflow_definition: dict, init_parameters: Dict[str, Union[Any, Callable[[None], Any]]], ) -> CompiledWorkflow: - available_blocks = load_workflow_blocks() + statically_defined_blocks = load_workflow_blocks() initializers = load_initializers() + dynamic_blocks = compile_dynamic_blocks( + dynamic_blocks_definitions=workflow_definition.get( + "dynamic_blocks_definitions", [] + ) + ) parsed_workflow_definition = parse_workflow_definition( raw_workflow_definition=workflow_definition, + dynamic_blocks=dynamic_blocks, ) validate_workflow_specification(workflow_definition=parsed_workflow_definition) execution_graph = prepare_execution_graph( @@ -44,7 +53,7 @@ def compile_workflow( ) steps = initialise_steps( steps_manifest=parsed_workflow_definition.steps, - available_bocks=available_blocks, + available_bocks=statically_defined_blocks + dynamic_blocks, explicit_init_parameters=init_parameters, initializers=initializers, ) diff --git a/inference/core/workflows/execution_engine/compiler/entities.py b/inference/core/workflows/execution_engine/compiler/entities.py index 08b4cb2fee..9752993ad6 100644 --- a/inference/core/workflows/execution_engine/compiler/entities.py +++ b/inference/core/workflows/execution_engine/compiler/entities.py @@ -1,7 +1,7 @@ from abc import abstractmethod from dataclasses import dataclass, field from enum import Enum -from typing import Any, Dict, Generator, List, Optional, Set, Type, Union +from typing import Any, Callable, Dict, Generator, List, Optional, Set, Type, Union import networkx as nx diff --git a/inference/core/workflows/execution_engine/compiler/steps_initialiser.py b/inference/core/workflows/execution_engine/compiler/steps_initialiser.py index 8603081a54..b7ce38c3a0 100644 --- a/inference/core/workflows/execution_engine/compiler/steps_initialiser.py +++ b/inference/core/workflows/execution_engine/compiler/steps_initialiser.py @@ -1,4 +1,5 @@ -from typing import Any, Callable, Dict, List, Union +from dataclasses import replace +from typing import Any, Callable, Dict, List, Tuple, Type, Union from inference.core.workflows.errors import ( BlockInitParameterNotProvidedError, @@ -8,6 +9,7 @@ from inference.core.workflows.execution_engine.compiler.entities import ( BlockSpecification, InitialisedStep, + ParsedWorkflowDefinition, ) from inference.core.workflows.prototypes.block import WorkflowBlockManifest diff --git a/inference/core/workflows/execution_engine/compiler/syntactic_parser.py b/inference/core/workflows/execution_engine/compiler/syntactic_parser.py index 7a19799b56..17898d7ccc 100644 --- a/inference/core/workflows/execution_engine/compiler/syntactic_parser.py +++ b/inference/core/workflows/execution_engine/compiler/syntactic_parser.py @@ -7,17 +7,24 @@ from inference.core.workflows.entities.base import InputType, JsonField from inference.core.workflows.errors import WorkflowSyntaxError from inference.core.workflows.execution_engine.compiler.entities import ( + BlockSpecification, ParsedWorkflowDefinition, ) +from inference.core.workflows.execution_engine.dynamic_blocks.entities import ( + DynamicBlockDefinition, +) from inference.core.workflows.execution_engine.introspection.blocks_loader import ( + load_all_defined_kinds, load_workflow_blocks, ) def parse_workflow_definition( - raw_workflow_definition: dict, + raw_workflow_definition: dict, dynamic_blocks: List[BlockSpecification] ) -> ParsedWorkflowDefinition: - workflow_definition_class = build_workflow_definition_entity() + workflow_definition_class = build_workflow_definition_entity( + dynamic_blocks=dynamic_blocks, + ) try: workflow_definition = workflow_definition_class.model_validate( raw_workflow_definition @@ -36,8 +43,10 @@ def parse_workflow_definition( ) from e -def build_workflow_definition_entity() -> Type[BaseModel]: - blocks = load_workflow_blocks() +def build_workflow_definition_entity( + dynamic_blocks: List[BlockSpecification], +) -> Type[BaseModel]: + blocks = load_workflow_blocks() + dynamic_blocks steps_manifests = tuple(block.manifest_class for block in blocks) block_manifest_types_union = Union[steps_manifests] block_type = Annotated[block_manifest_types_union, Field(discriminator="type")] diff --git a/inference/core/workflows/execution_engine/dynamic_blocks/__init__.py b/inference/core/workflows/execution_engine/dynamic_blocks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/inference/core/workflows/execution_engine/dynamic_blocks/block_assembler.py b/inference/core/workflows/execution_engine/dynamic_blocks/block_assembler.py new file mode 100644 index 0000000000..07c55e7390 --- /dev/null +++ b/inference/core/workflows/execution_engine/dynamic_blocks/block_assembler.py @@ -0,0 +1,396 @@ +from copy import deepcopy +from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union +from uuid import uuid4 + +from pydantic import BaseModel, ConfigDict, Field, create_model + +from inference.core.env import ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS +from inference.core.workflows.entities.base import OutputDefinition +from inference.core.workflows.entities.types import ( + WILDCARD_KIND, + Kind, + StepOutputImageSelector, + StepOutputSelector, + WorkflowImageSelector, + WorkflowParameterSelector, +) +from inference.core.workflows.errors import ( + DynamicBlockError, + WorkflowEnvironmentConfigurationError, +) +from inference.core.workflows.execution_engine.compiler.entities import ( + BlockSpecification, +) +from inference.core.workflows.execution_engine.dynamic_blocks.block_scaffolding import ( + assembly_custom_python_block, +) +from inference.core.workflows.execution_engine.dynamic_blocks.entities import ( + BLOCK_SOURCE, + DynamicBlockDefinition, + DynamicInputDefinition, + DynamicOutputDefinition, + ManifestDescription, + SelectorType, + ValueType, +) +from inference.core.workflows.execution_engine.introspection.blocks_loader import ( + load_all_defined_kinds, +) +from inference.core.workflows.execution_engine.introspection.utils import ( + build_human_friendly_block_name, + get_full_type_name, +) +from inference.core.workflows.prototypes.block import WorkflowBlockManifest + + +def compile_dynamic_blocks( + dynamic_blocks_definitions: List[dict], +) -> List[BlockSpecification]: + if not dynamic_blocks_definitions: + return [] + if not ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS: + raise WorkflowEnvironmentConfigurationError( + public_message="Cannot use dynamic blocks with custom Python code in this installation of `workflows`. " + "This can be changed by setting environmental variable " + "`ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS=True`", + context="workflow_compilation | dynamic_blocks_compilation", + ) + all_defined_kinds = load_all_defined_kinds() + kinds_lookup = {kind.name: kind for kind in all_defined_kinds} + dynamic_blocks = [ + DynamicBlockDefinition.model_validate(dynamic_block) + for dynamic_block in dynamic_blocks_definitions + ] + compiled_blocks = [] + for dynamic_block in dynamic_blocks: + block_specification = create_dynamic_block_specification( + dynamic_block_definition=dynamic_block, + kinds_lookup=kinds_lookup, + ) + compiled_blocks.append(block_specification) + return compiled_blocks + + +def create_dynamic_block_specification( + dynamic_block_definition: DynamicBlockDefinition, + kinds_lookup: Dict[str, Kind], +) -> BlockSpecification: + unique_identifier = str(uuid4()) + block_manifest = assembly_dynamic_block_manifest( + unique_identifier=unique_identifier, + manifest_description=dynamic_block_definition.manifest, + kinds_lookup=kinds_lookup, + ) + block_class = assembly_custom_python_block( + block_type_name=dynamic_block_definition.manifest.block_type, + unique_identifier=unique_identifier, + manifest=block_manifest, + python_code=dynamic_block_definition.code, + ) + return BlockSpecification( + block_source=BLOCK_SOURCE, + identifier=get_full_type_name(selected_type=block_class), + block_class=block_class, + manifest_class=block_manifest, + ) + + +def assembly_dynamic_block_manifest( + unique_identifier: str, + manifest_description: ManifestDescription, + kinds_lookup: Dict[str, Kind], +) -> Type[WorkflowBlockManifest]: + inputs_definitions = build_inputs( + block_type=manifest_description.block_type, + inputs=manifest_description.inputs, + kinds_lookup=kinds_lookup, + ) + manifest_class = create_model( + f"DynamicBlockManifest[{unique_identifier}]", + __config__=ConfigDict( + extra="allow", + json_schema_extra={ + "name": build_human_friendly_block_name( + fully_qualified_name=manifest_description.block_type + ) + }, + ), + name=(str, ...), + type=(Literal[manifest_description.block_type], ...), + **inputs_definitions, + ) + outputs_definitions = build_outputs_definitions( + block_type=manifest_description.block_type, + outputs=manifest_description.outputs, + kinds_lookup=kinds_lookup, + ) + return assembly_manifest_class_methods( + block_type=manifest_description.block_type, + manifest_class=manifest_class, + outputs_definitions=outputs_definitions, + manifest_description=manifest_description, + ) + + +PYTHON_TYPES_MAPPING = { + ValueType.ANY: Any, + ValueType.INTEGER: int, + ValueType.FLOAT: float, + ValueType.BOOLEAN: bool, + ValueType.DICT: dict, + ValueType.LIST: list, + ValueType.STRING: str, +} + + +def build_inputs( + block_type: str, + inputs: Dict[str, DynamicInputDefinition], + kinds_lookup: Dict[str, Kind], +) -> Dict[str, Tuple[type, Field]]: + result = {} + for input_name, input_definition in inputs.items(): + result[input_name] = build_input( + block_type=block_type, + input_name=input_name, + input_definition=input_definition, + kinds_lookup=kinds_lookup, + ) + return result + + +def build_input( + block_type: str, + input_name: str, + input_definition: DynamicInputDefinition, + kinds_lookup: Dict[str, Kind], +) -> Tuple[type, Field]: + input_type = build_input_field_type( + block_type=block_type, + input_name=input_name, + input_definition=input_definition, + kinds_lookup=kinds_lookup, + ) + field_metadata = build_input_field_metadata(input_definition=input_definition) + return input_type, field_metadata + + +def build_input_field_type( + block_type: str, + input_name: str, + input_definition: DynamicInputDefinition, + kinds_lookup: Dict[str, Kind], +) -> type: + input_type_union_elements = collect_python_types_for_selectors( + block_type=block_type, + input_name=input_name, + input_definition=input_definition, + kinds_lookup=kinds_lookup, + ) + input_type_union_elements += collect_python_types_for_values( + block_type=block_type, + input_name=input_name, + input_definition=input_definition, + ) + if not input_type_union_elements: + raise DynamicBlockError( + public_message=f"There is no definition of input type found for property: {input_name} of " + f"dynamic block {block_type}.", + context="workflow_compilation | dynamic_block_compilation | manifest_compilation", + ) + if len(input_type_union_elements) > 1: + input_type = Union[tuple(input_type_union_elements)] + else: + input_type = input_type_union_elements[0] + if input_definition.is_optional: + input_type = Optional[input_type] + return input_type + + +def collect_python_types_for_selectors( + block_type: str, + input_name: str, + input_definition: DynamicInputDefinition, + kinds_lookup: Dict[str, Kind], +) -> List[type]: + result = [] + for selector_type in input_definition.selector_types: + selector_kind_names = input_definition.selector_data_kind.get( + selector_type, ["*"] + ) + selector_kind = [] + for kind_name in selector_kind_names: + if kind_name not in kinds_lookup: + raise DynamicBlockError( + public_message=f"Could not find kind with name `{kind_name}` declared for input `{input_name}` " + f"of dynamic block `{block_type}` within kinds that would be recognised by Execution " + f"Engine knowing the following kinds: {list(kinds_lookup.keys())}.", + context="workflow_compilation | dynamic_block_compilation | manifest_compilation", + ) + selector_kind.append(kinds_lookup[kind_name]) + if selector_type is SelectorType.INPUT_IMAGE: + result.append(WorkflowImageSelector) + elif selector_type is SelectorType.STEP_OUTPUT_IMAGE: + result.append(StepOutputImageSelector) + elif selector_type is SelectorType.INPUT_PARAMETER: + result.append(WorkflowParameterSelector(kind=selector_kind)) + elif selector_type is SelectorType.STEP_OUTPUT: + result.append(StepOutputSelector(kind=selector_kind)) + else: + raise DynamicBlockError( + public_message=f"Could not recognise selector type `{selector_type}` declared for input `{input_name}` " + f"of dynamic block `{block_type}`.", + context="workflow_compilation | dynamic_block_compilation | manifest_compilation", + ) + return result + + +def collect_python_types_for_values( + block_type: str, + input_name: str, + input_definition: DynamicInputDefinition, +) -> List[type]: + result = [] + for value_type_name in input_definition.value_types: + if value_type_name not in PYTHON_TYPES_MAPPING: + raise DynamicBlockError( + public_message=f"Could not resolve Python type `{value_type_name}` declared for input `{input_name}` " + f"of dynamic block `{block_type}` within types that would be recognised by Execution " + f"Engine knowing the following types: {list(PYTHON_TYPES_MAPPING.keys())}.", + context="workflow_compilation | dynamic_block_compilation | manifest_compilation", + ) + value_type = PYTHON_TYPES_MAPPING[value_type_name] + result.append(value_type) + return result + + +def build_input_field_metadata(input_definition: DynamicInputDefinition) -> Field: + if not input_definition.has_default_value: + return Field() + default_value = input_definition.default_value + field_metadata_params = {} + if default_holds_compound_object(default_value=default_value): + field_metadata_params["default_factory"] = lambda: deepcopy(default_value) + else: + field_metadata_params["default"] = default_value + field_metadata = Field(**field_metadata_params) + return field_metadata + + +def default_holds_compound_object(default_value: Any) -> bool: + return ( + isinstance(default_value, list) + or isinstance(default_value, dict) + or isinstance(default_value, set) + ) + + +def build_outputs_definitions( + block_type: str, + outputs: Dict[str, DynamicOutputDefinition], + kinds_lookup: Dict[str, Kind], +) -> List[OutputDefinition]: + result = [] + for name, definition in outputs.items(): + if not definition.kind: + result.append(OutputDefinition(name=name, kind=[WILDCARD_KIND])) + else: + actual_kinds = collect_actual_kinds_for_output( + block_type=block_type, + output_name=name, + output=definition, + kinds_lookup=kinds_lookup, + ) + result.append(OutputDefinition(name=name, kind=actual_kinds)) + return result + + +def collect_actual_kinds_for_output( + block_type: str, + output_name: str, + output: DynamicOutputDefinition, + kinds_lookup: Dict[str, Kind], +) -> List[Kind]: + actual_kinds = [] + for kind_name in output.kind: + if kind_name not in kinds_lookup: + raise DynamicBlockError( + public_message=f"Could not find kind with name `{kind_name}` declared for output `{output_name}` " + f"of dynamic block `{block_type}` within kinds that would be recognised by Execution " + f"Engine knowing the following kinds: {list(kinds_lookup.keys())}.", + context="workflow_compilation | dynamic_block_compilation | manifest_compilation", + ) + actual_kinds.append(kinds_lookup[kind_name]) + return actual_kinds + + +def collect_input_dimensionality_offsets( + inputs: Dict[str, DynamicInputDefinition], +) -> Dict[str, int]: + result = {} + for name, definition in inputs.items(): + if definition.dimensionality_offset != 0: + result[name] = definition.dimensionality_offset + return result + + +def assembly_manifest_class_methods( + block_type: str, + manifest_class: Type[BaseModel], + outputs_definitions: List[OutputDefinition], + manifest_description: ManifestDescription, +) -> Type[WorkflowBlockManifest]: + describe_outputs = lambda cls: outputs_definitions + setattr(manifest_class, "describe_outputs", classmethod(describe_outputs)) + setattr(manifest_class, "get_actual_outputs", describe_outputs) + accepts_batch_input = lambda cls: manifest_description.accepts_batch_input + setattr(manifest_class, "accepts_batch_input", classmethod(accepts_batch_input)) + input_dimensionality_offsets = collect_input_dimensionality_offsets( + inputs=manifest_description.inputs + ) + get_input_dimensionality_offsets = lambda cls: input_dimensionality_offsets + setattr( + manifest_class, + "get_input_dimensionality_offsets", + classmethod(get_input_dimensionality_offsets), + ) + dimensionality_reference = pick_dimensionality_reference_property( + block_type=block_type, + inputs=manifest_description.inputs, + ) + get_dimensionality_reference_property = lambda cls: dimensionality_reference + setattr( + manifest_class, + "get_dimensionality_reference_property", + classmethod(get_dimensionality_reference_property), + ) + get_output_dimensionality_offset = ( + lambda cls: manifest_description.output_dimensionality_offset + ) + setattr( + manifest_class, + "get_output_dimensionality_offset", + classmethod(get_output_dimensionality_offset), + ) + accepts_empty_values = lambda cls: manifest_description.accepts_empty_values + setattr(manifest_class, "accepts_empty_values", classmethod(accepts_empty_values)) + return manifest_class + + +def pick_dimensionality_reference_property( + block_type: str, inputs: Dict[str, DynamicInputDefinition] +) -> Optional[str]: + references = [] + for name, definition in inputs.items(): + if definition.is_dimensionality_reference: + references.append(name) + if not references: + return None + if len(references) == 1: + return references[0] + raise DynamicBlockError( + public_message=f"For dynamic block {block_type} detected multiple inputs declared to be " + f"dimensionality reference: {references}, whereas at max one should be declared " + f"to be reference.", + context="workflow_compilation | dynamic_block_compilation | manifest_compilation", + ) diff --git a/inference/core/workflows/execution_engine/dynamic_blocks/block_scaffolding.py b/inference/core/workflows/execution_engine/dynamic_blocks/block_scaffolding.py new file mode 100644 index 0000000000..1b1c8477c2 --- /dev/null +++ b/inference/core/workflows/execution_engine/dynamic_blocks/block_scaffolding.py @@ -0,0 +1,113 @@ +import types +from typing import List, Type + +from inference.core.env import ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS +from inference.core.workflows.errors import ( + DynamicBlockError, + WorkflowEnvironmentConfigurationError, +) +from inference.core.workflows.execution_engine.dynamic_blocks.entities import PythonCode +from inference.core.workflows.prototypes.block import ( + BlockResult, + WorkflowBlock, + WorkflowBlockManifest, +) + +IMPORTS_LINES = [ + "from typing import Any, List, Dict, Set, Optional", + "import supervision as sv", + "import numpy as np", + "import math", + "import time", + "import json", + "import os", + "import requests", + "import cv2", + "import shapely", + "from inference.core.workflows.entities.base import Batch, WorkflowImageData", + "from inference.core.workflows.prototypes.block import BlockResult", +] + + +def assembly_custom_python_block( + block_type_name: str, + unique_identifier: str, + manifest: Type[WorkflowBlockManifest], + python_code: PythonCode, +) -> Type[WorkflowBlock]: + code_module = create_dynamic_module( + block_type_name=block_type_name, + python_code=python_code, + module_name=f"dynamic_module_{unique_identifier}", + ) + if not hasattr(code_module, python_code.run_function_name): + raise DynamicBlockError( + public_message=f"Cannot find function: {python_code.run_function_name} in declared code for " + f"dynamic block: `{block_type_name}`", + context="workflow_compilation | dynamic_block_compilation | declared_symbols_fetching", + ) + run_function = getattr(code_module, python_code.run_function_name) + + async def run(self, *args, **kwargs) -> BlockResult: + if not ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS: + raise WorkflowEnvironmentConfigurationError( + public_message="Cannot use dynamic blocks with custom Python code in this installation of `workflows`. " + "This can be changed by setting environmental variable " + "`ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS=True`", + context="workflow_execution | step_execution | dynamic_step", + ) + return run_function(self, *args, **kwargs) + + if python_code.init_function_code is not None and not hasattr( + code_module, python_code.init_function_name + ): + raise DynamicBlockError( + public_message=f"Cannot find function: {python_code.init_function_name} in declared code for " + f"dynamic block: `{block_type_name}`", + context="workflow_compilation | dynamic_block_compilation | declared_symbols_fetching", + ) + + init_function = getattr(code_module, python_code.init_function_name, dict) + + def constructor(self): + self._init_results = init_function() + + @classmethod + def get_init_parameters(cls) -> List[str]: + return [] + + @classmethod + def get_manifest(cls) -> Type[WorkflowBlockManifest]: + return manifest + + return type( + f"DynamicBlock[{unique_identifier}]", + (WorkflowBlock,), + { + "__init__": constructor, + "get_init_parameters": get_init_parameters, + "get_manifest": get_manifest, + "run": run, + }, + ) + + +def create_dynamic_module( + block_type_name: str, python_code: PythonCode, module_name: str +) -> types.ModuleType: + imports = "\n".join(IMPORTS_LINES) + "\n" + "\n".join(python_code.imports) + "\n\n" + code = python_code.run_function_code + if python_code.init_function_code: + code += "\n\n" + python_code.init_function_code + code = imports + code + try: + dynamic_module = types.ModuleType(module_name) + exec(code, dynamic_module.__dict__) + return dynamic_module + except Exception as error: + raise DynamicBlockError( + public_message=f"Error of type `{error.__class__.__name__}` encountered while attempting to " + f"create Python module with code for block: {block_type_name}. Error message: {error}. Full code:\n{code}", + context="workflow_compilation | dynamic_block_compilation | dynamic_module_creation", + inner_error=error, + ) from error diff --git a/inference/core/workflows/execution_engine/dynamic_blocks/entities.py b/inference/core/workflows/execution_engine/dynamic_blocks/entities.py new file mode 100644 index 0000000000..7daee0ae35 --- /dev/null +++ b/inference/core/workflows/execution_engine/dynamic_blocks/entities.py @@ -0,0 +1,147 @@ +from enum import Enum +from typing import Any, Dict, List, Literal, Optional + +from pydantic import BaseModel, Field + + +class SelectorType(Enum): + INPUT_IMAGE = "input_image" + STEP_OUTPUT_IMAGE = "step_output_image" + INPUT_PARAMETER = "input_parameter" + STEP_OUTPUT = "step_output" + + +class ValueType(Enum): + ANY = "any" + INTEGER = "integer" + FLOAT = "float" + BOOLEAN = "boolean" + DICT = "dict" + LIST = "list" + STRING = "string" + + +class DynamicInputDefinition(BaseModel): + type: Literal["DynamicInputDefinition"] + has_default_value: bool = Field( + default=False, + description="Flag to decide if default value is provided for input", + ) + default_value: Any = Field( + description="Definition of default value for a field. Use in combination with, " + "`has_default_value` to decide on default value if field is optional.", + default=None, + ) + is_optional: bool = Field( + description="Flag deciding if `default_value` will be added for manifest field annotation.", + default=False, + ) + is_dimensionality_reference: bool = Field( + default=False, + description="Flag deciding if declared property holds dimensionality reference - see how " + "dimensionality works for statically defined blocks to discover meaning of the " + "parameter.", + ) + dimensionality_offset: int = Field( + default=0, + ge=-1, + le=1, + description="Accepted dimensionality offset for parameter. Dimensionality works the same as for " + "traditional workflows blocks.", + ) + selector_types: List[SelectorType] = Field( + default_factory=list, + description="Union of selector types accepted by input. Should be empty if field does not accept " + "selectors.", + ) + selector_data_kind: Dict[SelectorType, List[str]] = Field( + default_factory=dict, + description="Mapping of `selector_types` into names of kinds to be compatible. " + "Empty dict (default value) means wildcard kind for all selectors. If name of kind given - " + "must be valid kind, known for workflow execution engine.", + ) + value_types: List[ValueType] = Field( + default_factory=list, + description="List of types representing union of types for static values (non selectors) " + "that shall be accepted for input field. Empty list represents no value types allowed.", + ) + + +class DynamicOutputDefinition(BaseModel): + type: Literal["DynamicOutputDefinition"] + kind: List[str] = Field( + default_factory=list, + description="List representing union of kinds for defined output", + ) + + +class ManifestDescription(BaseModel): + type: Literal["ManifestDescription"] + block_type: str = Field( + description="Field holds type of the bock to be dynamically created. Block can be initialised " + "as step using the type declared in the field." + ) + inputs: Dict[str, DynamicInputDefinition] = Field( + description="Mapping name -> input definition for block inputs (parameters for run() function of" + "dynamic block)" + ) + outputs: Dict[str, DynamicOutputDefinition] = Field( + default_factory=dict, + description="Mapping name -> output kind for block outputs.", + ) + output_dimensionality_offset: int = Field( + default=0, ge=-1, le=1, description="Definition of output dimensionality offset" + ) + accepts_batch_input: bool = Field( + default=False, + description="Flag to decide if function will be provided with batch data as whole or with singular " + "batch elements while execution", + ) + accepts_empty_values: bool = Field( + default=False, + description="Flag to decide if empty (optional) values will be shipped as run() function parameters", + ) + + +class PythonCode(BaseModel): + type: Literal["PythonCode"] + run_function_code: str = Field( + description="Code of python function. Content should be properly formatted including indentations. " + "Workflows execution engine is to create dynamic module with provided function - ensuring " + "imports of the following symbols: [Any, List, Dict, Set, sv, np, math, time, json, os, " + "requests, cv2, shapely, Batch, WorkflowImageData, BlockResult]. Expected signature is: " + "def run(self, ... # parameters of manifest apart from name and type). Through self, " + "one may access self._init_results which is dict returned by `init_code` if given." + ) + run_function_name: str = Field( + default="run", description="Name of the function shipped in `function_code`." + ) + init_function_code: Optional[str] = Field( + description="Code of the function to perform initialisation of the block. It must be " + "parameter-free function with signature `def init() -> Dict[str, Any]` setting " + "self._init_results on dynamic class initialisation", + default=None, + ) + init_function_name: str = Field( + default="init", + description="Name of init_code function.", + ) + imports: List[str] = Field( + default_factory=list, + description="List of additional imports required to run the code", + ) + + +class DynamicBlockDefinition(BaseModel): + type: Literal["DynamicBlockDefinition"] + manifest: ManifestDescription = Field( + description="Definition of manifest for dynamic block to be created in runtime by " + "workflows execution engine." + ) + code: PythonCode = Field( + description="Code to be executed in run(...) method of block that will be dynamically " + "created." + ) + + +BLOCK_SOURCE = "dynamic_workflows_blocks" diff --git a/inference/core/workflows/execution_engine/introspection/blocks_loader.py b/inference/core/workflows/execution_engine/introspection/blocks_loader.py index ca29730a5e..f8b45e6433 100644 --- a/inference/core/workflows/execution_engine/introspection/blocks_loader.py +++ b/inference/core/workflows/execution_engine/introspection/blocks_loader.py @@ -4,7 +4,11 @@ from collections import Counter from typing import Any, Callable, Dict, List, Union -from inference.core.workflows.core_steps.loader import load_blocks +from inference.core.workflows.core_steps.loader import ( + REGISTERED_INITIALIZERS, + load_blocks, + load_kinds, +) from inference.core.workflows.entities.types import Kind from inference.core.workflows.errors import PluginInterfaceError, PluginLoadingError from inference.core.workflows.execution_engine.compiler.entities import ( @@ -14,38 +18,24 @@ BlockDescription, BlocksDescription, ) -from inference.core.workflows.execution_engine.introspection.schema_parser import ( - retrieve_selectors_from_schema, -) from inference.core.workflows.execution_engine.introspection.utils import ( build_human_friendly_block_name, get_full_type_name, ) +from inference.core.workflows.prototypes.block import WorkflowBlock WORKFLOWS_PLUGINS_ENV = "WORKFLOWS_PLUGINS" +WORKFLOWS_CORE_PLUGIN_NAME = "workflows_core" -def describe_available_blocks() -> BlocksDescription: - blocks = load_workflow_blocks() - declared_kinds = [] +def describe_available_blocks( + dynamic_blocks: List[BlockSpecification], +) -> BlocksDescription: + blocks = load_workflow_blocks() + dynamic_blocks result = [] for block in blocks: block_schema = block.manifest_class.model_json_schema() outputs_manifest = block.manifest_class.describe_outputs() - schema_selectors = retrieve_selectors_from_schema( - schema=block_schema, - inputs_dimensionality_offsets=block.manifest_class.get_input_dimensionality_offsets(), - dimensionality_reference_property=block.manifest_class.get_dimensionality_reference_property(), - ) - block_kinds = [ - k - for s in schema_selectors.values() - for r in s.allowed_references - for k in r.kind - ] - declared_kinds.extend(block_kinds) - for output in outputs_manifest: - declared_kinds.extend(output.kind) manifest_type_identifiers = get_manifest_type_identifiers( block_schema=block_schema, block_source=block.block_source, @@ -68,8 +58,7 @@ def describe_available_blocks() -> BlocksDescription: ) _validate_loaded_blocks_names_uniqueness(blocks=result) _validate_loaded_blocks_manifest_type_identifiers(blocks=result) - declared_kinds = list(set(declared_kinds)) - _validate_used_kinds_uniqueness(declared_kinds=declared_kinds) + declared_kinds = load_all_defined_kinds() return BlocksDescription(blocks=result, declared_kinds=declared_kinds) @@ -110,14 +99,16 @@ def load_core_workflow_blocks() -> List[BlockSpecification]: already_spotted_blocks = set() result = [] for block in core_blocks: + manifest_class = block.get_manifest() + identifier = get_full_type_name(selected_type=block) if block in already_spotted_blocks: continue result.append( BlockSpecification( - block_source="workflows_core", - identifier=get_full_type_name(selected_type=block), + block_source=WORKFLOWS_CORE_PLUGIN_NAME, + identifier=identifier, block_class=block, - manifest_class=block.get_manifest(), + manifest_class=manifest_class, ) ) already_spotted_blocks.add(block) @@ -132,13 +123,6 @@ def load_plugins_blocks() -> List[BlockSpecification]: return custom_blocks -def get_plugin_modules() -> List[str]: - plugins_to_load = os.environ.get(WORKFLOWS_PLUGINS_ENV) - if plugins_to_load is None: - return [] - return plugins_to_load.split(",") - - def load_blocks_from_plugin(plugin_name: str) -> List[BlockSpecification]: try: return _load_blocks_from_plugin(plugin_name=plugin_name) @@ -163,7 +147,21 @@ def _load_blocks_from_plugin(plugin_name: str) -> List[BlockSpecification]: blocks = module.load_blocks() already_spotted_blocks = set() result = [] - for block in blocks: + if not isinstance(blocks, list): + raise PluginInterfaceError( + public_message=f"Provided workflow plugin `{plugin_name}` implement `load_blocks()` function " + f"incorrectly. Expected to return list of entries being subclass of `WorkflowBlock`, " + f"but entry of different characteristics found: {type(blocks)}.", + context="workflow_compilation | blocks_loading", + ) + for i, block in enumerate(blocks): + if not isinstance(block, type) or not issubclass(block, WorkflowBlock): + raise PluginInterfaceError( + public_message=f"Provided workflow plugin `{plugin_name}` implement `load_blocks()` function " + f"incorrectly. Expected to return list of entries being subclass of `WorkflowBlock`, " + f"but entry of different characteristics found: {block} at position: {i}.", + context="workflow_compilation | blocks_loading", + ) if block in already_spotted_blocks: continue result.append( @@ -179,15 +177,20 @@ def _load_blocks_from_plugin(plugin_name: str) -> List[BlockSpecification]: def load_initializers() -> Dict[str, Union[Any, Callable[[None], Any]]]: - plugins_to_load = os.environ.get(WORKFLOWS_PLUGINS_ENV) - if plugins_to_load is None: - return {} - result = {} - for plugin_name in plugins_to_load.split(","): + plugins_to_load = get_plugin_modules() + result = load_core_blocks_initializers() + for plugin_name in plugins_to_load: result.update(load_initializers_from_plugin(plugin_name=plugin_name)) return result +def load_core_blocks_initializers() -> Dict[str, Union[Any, Callable[[None], Any]]]: + return { + f"{WORKFLOWS_CORE_PLUGIN_NAME}.{parameter_name}": initializer + for parameter_name, initializer in REGISTERED_INITIALIZERS.items() + } + + def load_initializers_from_plugin( plugin_name: str, ) -> Dict[str, Union[Any, Callable[[None], Any]]]: @@ -265,3 +268,68 @@ def _validate_used_kinds_uniqueness(declared_kinds: List[Kind]) -> None: f"the same name.", context="workflow_compilation | blocks_loading", ) + + +def load_all_defined_kinds() -> List[Kind]: + core_blocks_kinds = load_kinds() + plugins_kinds = load_plugins_kinds() + declared_kinds = core_blocks_kinds + plugins_kinds + declared_kinds = list(set(declared_kinds)) + _validate_used_kinds_uniqueness(declared_kinds=declared_kinds) + return declared_kinds + + +def load_plugins_kinds() -> List[Kind]: + plugins_to_load = get_plugin_modules() + result = [] + for plugin_name in plugins_to_load: + result.extend(load_plugin_kinds(plugin_name=plugin_name)) + return result + + +def load_plugin_kinds(plugin_name: str) -> List[Kind]: + try: + return _load_plugin_kinds(plugin_name=plugin_name) + except ImportError as e: + raise PluginLoadingError( + public_message=f"It is not possible to load kinds from workflow plugin `{plugin_name}`. " + f"Make sure the library providing custom step is correctly installed in Python environment.", + context="workflow_compilation | blocks_loading", + inner_error=e, + ) from e + except AttributeError as e: + raise PluginInterfaceError( + public_message=f"Provided workflow plugin `{plugin_name}` do not implement blocks loading " + f"interface correctly and cannot be loaded.", + context="workflow_compilation | blocks_loading", + inner_error=e, + ) from e + + +def _load_plugin_kinds(plugin_name: str) -> List[Kind]: + module = importlib.import_module(plugin_name) + if not hasattr(module, "load_kinds"): + return [] + kinds_extractor = getattr(module, "load_kinds") + if not callable(kinds_extractor): + logging.warning( + f"Found `load_kinds` symbol in plugin `{plugin_name}` module init, but it is not callable. " + f"Not importing kinds from that plugin." + ) + return [] + kinds = kinds_extractor() + if not isinstance(kinds, list) or not all(isinstance(e, Kind) for e in kinds): + raise PluginInterfaceError( + public_message=f"Provided workflow plugin `{plugin_name}` do not implement blocks loading " + f"interface correctly and cannot be loaded. Return value of `load_kinds()` " + f"is not list of objects `Kind`.", + context="workflow_compilation | blocks_loading", + ) + return kinds + + +def get_plugin_modules() -> List[str]: + plugins_to_load = os.environ.get(WORKFLOWS_PLUGINS_ENV) + if plugins_to_load is None: + return [] + return plugins_to_load.split(",") diff --git a/inference/core/workflows/execution_engine/introspection/entities.py b/inference/core/workflows/execution_engine/introspection/entities.py index f906d9514f..d86ae00e84 100644 --- a/inference/core/workflows/execution_engine/introspection/entities.py +++ b/inference/core/workflows/execution_engine/introspection/entities.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Dict, List, Optional, Set, Type +from typing import Callable, Dict, List, Optional, Set, Type, Union from pydantic import BaseModel, Field @@ -90,7 +90,12 @@ class DiscoveredConnections: class BlockDescription(BaseModel): - manifest_class: Type[WorkflowBlockManifest] = Field(exclude=True) + manifest_class: Union[Type[WorkflowBlockManifest], Type[BaseModel]] = Field( + exclude=True + ) + # Type[BaseModel] here is to let dynamic blocks being BaseModel to pass validation - but that should be + # the only case for using this type in this field. Dynamic blocks implements the same interface, yet due + # to dynamic nature of creation - cannot be initialised as abstract class WorkflowBlockManifest block_class: Type[WorkflowBlock] = Field(exclude=True) block_schema: dict = Field( description="OpenAPI specification of block manifest that " diff --git a/inference/core/workflows/prototypes/block.py b/inference/core/workflows/prototypes/block.py index d143465d78..667c55c3df 100644 --- a/inference/core/workflows/prototypes/block.py +++ b/inference/core/workflows/prototypes/block.py @@ -1,10 +1,9 @@ from abc import ABC, abstractmethod -from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union +from typing import Any, Dict, List, Optional, Type, Union from openai import BaseModel from pydantic import ConfigDict, Field -from inference.core import logger from inference.core.workflows.entities.base import OutputDefinition from inference.core.workflows.entities.types import FlowControl from inference.core.workflows.errors import BlockInterfaceError diff --git a/tests/inference/hosted_platform_tests/conftest.py b/tests/inference/hosted_platform_tests/conftest.py index 0d9ef9af0c..43bc043592 100644 --- a/tests/inference/hosted_platform_tests/conftest.py +++ b/tests/inference/hosted_platform_tests/conftest.py @@ -249,6 +249,3 @@ def retry_at_max_n_times(function: callable, n: int, function_description: str) return None attempts += 1 raise Exception(f"Could not achieve success of {function_description}") - - - diff --git a/tests/inference/hosted_platform_tests/test_workflows.py b/tests/inference/hosted_platform_tests/test_workflows.py index f5beeb4015..0a34381f27 100644 --- a/tests/inference/hosted_platform_tests/test_workflows.py +++ b/tests/inference/hosted_platform_tests/test_workflows.py @@ -5,7 +5,9 @@ @pytest.mark.flaky(retries=4, delay=1) -def test_getting_schemas(object_detection_service_url: str) -> None: +def test_getting_schemas_from_legacy_get_endpoint( + object_detection_service_url: str, +) -> None: # when response = requests.get(f"{object_detection_service_url}/workflows/blocks/describe") @@ -18,6 +20,37 @@ def test_getting_schemas(object_detection_service_url: str) -> None: "kinds_connections", "primitives_connections", "universal_query_language_description", + "dynamic_block_definition_schema", + } + assert len(response_data["blocks"]) > 0, "Some blocs expected to be added" + assert len(response_data["declared_kinds"]) > 0, "Some kinds must be declared" + assert len(response_data["declared_kinds"]) >= len( + response_data["kinds_connections"] + ), "Kinds connections declared as inputs for blocks must be at most in number of all declared kinds" + assert ( + len(response_data["primitives_connections"]) > 0 + ), "Expected some primitive parameters for steps to be declared" + + +@pytest.mark.flaky(retries=4, delay=1) +def test_getting_schemas_from_new_post_endpoint( + object_detection_service_url: str, +) -> None: + # when + response = requests.post( + f"{object_detection_service_url}/workflows/blocks/describe" + ) + + # then + response.raise_for_status() + response_data = response.json() + assert set(response_data.keys()) == { + "blocks", + "declared_kinds", + "kinds_connections", + "primitives_connections", + "universal_query_language_description", + "dynamic_block_definition_schema", } assert len(response_data["blocks"]) > 0, "Some blocs expected to be added" assert len(response_data["declared_kinds"]) > 0, "Some kinds must be declared" @@ -29,6 +62,80 @@ def test_getting_schemas(object_detection_service_url: str) -> None: ), "Expected some primitive parameters for steps to be declared" +FUNCTION = """ +def my_function(self, prediction: sv.Detections, crops: Batch[WorkflowImageData]) -> BlockResult: + detection_id2bbox = { + detection_id.item(): i for i, detection_id in enumerate(prediction.data["detection_id"]) + } + results = [] + for crop in crops: + parent_id = crop.parent_metadata.parent_id + results.append({"associated_detections": prediction[detection_id2bbox[parent_id]]}) + return results + """ +DYNAMIC_BLOCKS_DEFINITION = [ + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "DetectionsToCropsAssociation", + "inputs": { + "prediction": { + "type": "DynamicInputDefinition", + "selector_types": ["step_output"], + "selector_data_kind": { + "step_output": [ + "Batch[object_detection_prediction]", + "Batch[instance_segmentation_prediction]", + "Batch[keypoint_detection_prediction]", + ] + }, + }, + "crops": { + "type": "DynamicInputDefinition", + "selector_types": ["step_output_image"], + "is_dimensionality_reference": True, + "dimensionality_offset": 1, + }, + }, + "outputs": { + "associated_detections": { + "type": "DynamicOutputDefinition", + "kind": [ + "Batch[object_detection_prediction]", + "Batch[instance_segmentation_prediction]", + "Batch[keypoint_detection_prediction]", + ], + } + }, + }, + "code": { + "type": "PythonCode", + "run_function_code": FUNCTION, + "run_function_name": "my_function", + }, + }, +] + + +@pytest.mark.flaky(retries=4, delay=1) +def test_getting_schemas_from_new_post_endpoint_with_dynamic_blocks( + object_detection_service_url: str, +) -> None: + # when + response = requests.post( + f"{object_detection_service_url}/workflows/blocks/describe", + json={"dynamic_blocks_definitions": DYNAMIC_BLOCKS_DEFINITION}, + ) + + # then + assert response.status_code == 500 + response_data = response.json() + assert ( + "Cannot use dynamic blocks with custom Python code" in response_data["message"] + ), "Expected execution to be prevented" + + @pytest.mark.flaky(retries=4, delay=1) def test_getting_dynamic_outputs(object_detection_service_url: str) -> None: # when @@ -410,7 +517,7 @@ def test_ocr_workflow_run_when_run_expected_to_succeed( @pytest.mark.flaky(retries=4, delay=1) def test_yolo_world_workflow_run_when_run_expected_to_succeed( - object_detection_service_url: str, detection_model_id: str + object_detection_service_url: str, ) -> None: # when response = requests.post( @@ -440,3 +547,115 @@ def test_yolo_world_workflow_run_when_run_expected_to_succeed( assert ( len(response_data["outputs"]) == 2 ), "Two images submitted - two response expected" + + +FUNCTION_TO_GET_MAXIMUM_CONFIDENCE_FROM_BATCH_OF_DETECTIONS = """ +def run(self, predictions: Batch[sv.Detections]) -> BlockResult: + result = [] + for prediction in predictions: + result.append({"max_confidence": np.max(prediction.confidence).item()}) + return result +""" + +WORKFLOW_WITH_PYTHON_BLOCK_RUNNING_ON_BATCH = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + {"type": "WorkflowParameter", "name": "model_id"}, + ], + "dynamic_blocks_definitions": [ + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "MaxConfidence", + "inputs": { + "predictions": { + "type": "DynamicInputDefinition", + "selector_types": ["step_output"], + }, + }, + "outputs": { + "max_confidence": { + "type": "DynamicOutputDefinition", + "kind": ["float_zero_to_one"], + } + }, + "accepts_batch_input": True, + }, + "code": { + "type": "PythonCode", + "run_function_code": FUNCTION_TO_GET_MAXIMUM_CONFIDENCE_FROM_BATCH_OF_DETECTIONS, + }, + }, + ], + "steps": [ + { + "type": "RoboflowObjectDetectionModel", + "name": "model", + "image": "$inputs.image", + "model_id": "$inputs.model_id", + }, + { + "type": "MaxConfidence", + "name": "confidence_aggregation", + "predictions": "$steps.model.predictions", + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "max_confidence", + "selector": "$steps.confidence_aggregation.max_confidence", + }, + ], +} + + +@pytest.mark.flaky(retries=4, delay=1) +def test_workflow_run_with_dynamic_blocks( + object_detection_service_url: str, detection_model_id: str +) -> None: + # when + response = requests.post( + f"{object_detection_service_url}/workflows/run", + json={ + "specification": WORKFLOW_WITH_PYTHON_BLOCK_RUNNING_ON_BATCH, + "api_key": ROBOFLOW_API_KEY, + "inputs": { + "image": [ + { + "type": "url", + "value": "https://media.roboflow.com/fruit.png", + } + ] + * 2, + "model_id": detection_model_id, + }, + }, + ) + + # then + assert response.status_code == 500 + response_data = response.json() + assert ( + "Cannot use dynamic blocks with custom Python code" in response_data["message"] + ), "Expected execution to be prevented" + + +@pytest.mark.flaky(retries=4, delay=1) +def test_workflow_validate_with_dynamic_blocks( + object_detection_service_url: str, detection_model_id: str +) -> None: + # when + response = requests.post( + f"{object_detection_service_url}/workflows/validate", + json=WORKFLOW_WITH_PYTHON_BLOCK_RUNNING_ON_BATCH, + ) + + # then + assert response.status_code == 500 + response_data = response.json() + assert ( + "Cannot use dynamic blocks with custom Python code" in response_data["message"] + ), "Expected execution to be prevented" diff --git a/tests/inference/integration_tests/test_workflow_endpoints.py b/tests/inference/integration_tests/test_workflow_endpoints.py index e1d8213c88..fe402c98b2 100644 --- a/tests/inference/integration_tests/test_workflow_endpoints.py +++ b/tests/inference/integration_tests/test_workflow_endpoints.py @@ -5,7 +5,7 @@ API_KEY = os.environ.get("API_KEY") -def test_getting_blocks_descriptions(server_url) -> None: +def test_getting_blocks_descriptions_using_legacy_get_endpoint(server_url) -> None: # when response = requests.get(f"{server_url}/workflows/blocks/describe") @@ -30,6 +30,216 @@ def test_getting_blocks_descriptions(server_url) -> None: assert ( len(response_data["primitives_connections"]) > 0 ), "Expected some primitive parameters for steps to be declared" + assert ( + "universal_query_language_description" in response_data + ), "Expected universal_query_language_description key to be present in response" + assert ( + "dynamic_block_definition_schema" in response_data + ), "Expected key `dynamic_block_definition_schema` to be present in response" + + +def test_getting_blocks_descriptions_using_new_post_endpoint(server_url) -> None: + # when + response = requests.post(f"{server_url}/workflows/blocks/describe") + + # then + response.raise_for_status() + response_data = response.json() + assert "blocks" in response_data, "Response expected to define blocks" + assert len(response_data["blocks"]) > 0, "Some blocs expected to be added" + assert ( + "declared_kinds" in response_data + ), "Declared kinds must be provided in output" + assert len(response_data["declared_kinds"]) > 0, "Some kinds must be declared" + assert ( + "kinds_connections" in response_data + ), "Kinds connections expected to be declared" + assert len(response_data["declared_kinds"]) >= len( + response_data["kinds_connections"] + ), "Kinds connections declared as inputs for blocks must be at most in number of all declared kinds" + assert ( + "primitives_connections" in response_data + ), "Primitives connections expected to be in response" + assert ( + len(response_data["primitives_connections"]) > 0 + ), "Expected some primitive parameters for steps to be declared" + assert ( + "universal_query_language_description" in response_data + ), "Expected universal_query_language_description key to be present in response" + assert ( + "dynamic_block_definition_schema" in response_data + ), "Expected key `dynamic_block_definition_schema` to be present in response" + + +def test_getting_blocks_descriptions_using_new_post_endpoint_with_dynamic_steps( + server_url, +) -> None: + # given + function_code = """ +def my_function(self, prediction: sv.Detections, crops: Batch[WorkflowImageData]) -> BlockResult: + detection_id2bbox = { + detection_id.item(): i for i, detection_id in enumerate(prediction.data["detection_id"]) + } + results = [] + for crop in crops: + parent_id = crop.parent_metadata.parent_id + results.append({"associated_detections": prediction[detection_id2bbox[parent_id]]}) + return results + """ + dynamic_blocks_definitions = [ + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "DetectionsToCropsAssociation", + "inputs": { + "prediction": { + "type": "DynamicInputDefinition", + "selector_types": ["step_output"], + "selector_data_kind": { + "step_output": [ + "Batch[object_detection_prediction]", + "Batch[instance_segmentation_prediction]", + "Batch[keypoint_detection_prediction]", + ] + }, + }, + "crops": { + "type": "DynamicInputDefinition", + "selector_types": ["step_output_image"], + "is_dimensionality_reference": True, + "dimensionality_offset": 1, + }, + }, + "outputs": { + "associated_detections": { + "type": "DynamicOutputDefinition", + "kind": [ + "Batch[object_detection_prediction]", + "Batch[instance_segmentation_prediction]", + "Batch[keypoint_detection_prediction]", + ], + } + }, + }, + "code": { + "type": "PythonCode", + "run_function_code": function_code, + "run_function_name": "my_function", + }, + }, + ] + + # when + response = requests.post( + f"{server_url}/workflows/blocks/describe", + json={"dynamic_blocks_definitions": dynamic_blocks_definitions}, + ) + + # then + response.raise_for_status() + response_data = response.json() + assert "blocks" in response_data, "Response expected to define blocks" + assert len(response_data["blocks"]) > 0, "Some blocs expected to be added" + assert ( + "declared_kinds" in response_data + ), "Declared kinds must be provided in output" + assert len(response_data["declared_kinds"]) > 0, "Some kinds must be declared" + assert ( + "kinds_connections" in response_data + ), "Kinds connections expected to be declared" + assert len(response_data["declared_kinds"]) >= len( + response_data["kinds_connections"] + ), "Kinds connections declared as inputs for blocks must be at most in number of all declared kinds" + assert ( + "primitives_connections" in response_data + ), "Primitives connections expected to be in response" + assert ( + len(response_data["primitives_connections"]) > 0 + ), "Expected some primitive parameters for steps to be declared" + assert ( + "universal_query_language_description" in response_data + ), "Expected universal_query_language_description key to be present in response" + assert ( + "dynamic_block_definition_schema" in response_data + ), "Expected key `dynamic_block_definition_schema` to be present in response" + types_compatible_with_object_detection_predictions = { + e["manifest_type_identifier"] + for e in response_data["kinds_connections"][ + "Batch[object_detection_prediction]" + ] + } + assert ( + "DetectionsToCropsAssociation" + in types_compatible_with_object_detection_predictions + ), "Expected dynamic block to be manifested in connections" + + +def test_getting_blocks_descriptions_using_new_post_endpoint_with_dynamic_steps_when_steps_are_malformed( + server_url, +) -> None: + # given + function_code = """ +def my_function(self, prediction: sv.Detections, crops: Batch[WorkflowImageData]) -> BlockResult: + pass + """ + dynamic_blocks_definitions = [ + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "DetectionsToCropsAssociation", + "inputs": { + "prediction": { + "type": "DynamicInputDefinition", + "selector_types": ["step_output"], + "is_dimensionality_reference": True, + "selector_data_kind": { + "step_output": [ + "Batch[object_detection_prediction]", + "Batch[instance_segmentation_prediction]", + "Batch[keypoint_detection_prediction]", + ] + }, + }, + "crops": { + "type": "DynamicInputDefinition", + "selector_types": ["step_output_image"], + "is_dimensionality_reference": True, + "dimensionality_offset": 1, + }, + }, + "outputs": { + "associated_detections": { + "type": "DynamicOutputDefinition", + "kind": [ + "Batch[object_detection_prediction]", + "Batch[instance_segmentation_prediction]", + "Batch[keypoint_detection_prediction]", + ], + } + }, + }, + "code": { + "type": "PythonCode", + "run_function_code": function_code, + "run_function_name": "my_function", + }, + }, + ] + + # when + response = requests.post( + f"{server_url}/workflows/blocks/describe", + json={"dynamic_blocks_definitions": dynamic_blocks_definitions}, + ) + + # then + assert response.status_code == 400, "Expected bad request to be manifested" + response_data = response.json() + assert ( + "dimensionality reference" in response_data["message"] + ), "Expected the cause of problem being dimensionality reference declaration" def test_getting_dynamic_outputs(server_url: str) -> None: @@ -100,6 +310,86 @@ def test_compilation_endpoint_when_compilation_succeeds( assert response_data["status"] == "ok" +def test_compilation_endpoint_when_compilation_succeeds_with_custom_block( + server_url: str, +) -> None: + # given + init_function = """ +def init_model() -> Dict[str, Any]: + model = YOLOv8ObjectDetection(model_id="yolov8n-640") + return {"model": model} +""" + infer_function = """ +def infer(self, image: WorkflowImageData) -> BlockResult: + predictions = self._init_results["model"].infer(image.numpy_image) + return {"predictions": sv.Detections.from_inference(predictions[0].model_dump(by_alias=True, exclude_none=True))} +""" + valid_workflow_definition = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + ], + "dynamic_blocks_definitions": [ + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "CustomModel", + "inputs": { + "image": { + "type": "DynamicInputDefinition", + "selector_types": ["input_image"], + }, + }, + "outputs": { + "predictions": { + "type": "DynamicOutputDefinition", + "kind": [ + "Batch[object_detection_prediction]", + ], + } + }, + }, + "code": { + "type": "PythonCode", + "run_function_code": infer_function, + "run_function_name": "infer", + "init_function_code": init_function, + "init_function_name": "init_model", + "imports": [ + "from inference.models.yolov8 import YOLOv8ObjectDetection", + ], + }, + }, + ], + "steps": [ + { + "type": "CustomModel", + "name": "model", + "image": "$inputs.image", + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "predictions", + "selector": "$steps.model.predictions", + }, + ], + } + + # when + response = requests.post( + f"{server_url}/workflows/validate", + json=valid_workflow_definition, + ) + + # then + response.raise_for_status() + response_data = response.json() + assert response_data["status"] == "ok" + + def test_compilation_endpoint_when_compilation_fails( server_url: str, ) -> None: @@ -142,7 +432,7 @@ def test_compilation_endpoint_when_compilation_fails( def test_workflow_run( server_url: str, - clean_loaded_models_fixture + clean_loaded_models_fixture, ) -> None: # given valid_workflow_definition = { @@ -204,3 +494,104 @@ def test_workflow_run( assert ( len(response_data["outputs"][1]["result"]["predictions"]) == 6 ), "Expected to see 6 predictions" + + +FUNCTION_TO_GET_MAXIMUM_CONFIDENCE_FROM_BATCH_OF_DETECTIONS = """ +def run(self, predictions: Batch[sv.Detections]) -> BlockResult: + result = [] + for prediction in predictions: + result.append({"max_confidence": np.max(prediction.confidence).item()}) + return result +""" + +WORKFLOW_WITH_PYTHON_BLOCK_RUNNING_ON_BATCH = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + ], + "dynamic_blocks_definitions": [ + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "MaxConfidence", + "inputs": { + "predictions": { + "type": "DynamicInputDefinition", + "selector_types": ["step_output"], + }, + }, + "outputs": { + "max_confidence": { + "type": "DynamicOutputDefinition", + "kind": ["float_zero_to_one"], + } + }, + "accepts_batch_input": True, + }, + "code": { + "type": "PythonCode", + "run_function_code": FUNCTION_TO_GET_MAXIMUM_CONFIDENCE_FROM_BATCH_OF_DETECTIONS, + }, + }, + ], + "steps": [ + { + "type": "RoboflowObjectDetectionModel", + "name": "model", + "image": "$inputs.image", + "model_id": "yolov8n-640", + }, + { + "type": "MaxConfidence", + "name": "confidence_aggregation", + "predictions": "$steps.model.predictions", + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "max_confidence", + "selector": "$steps.confidence_aggregation.max_confidence", + }, + ], +} + + +def test_workflow_run_when_dynamic_block_is_in_use( + server_url: str, + clean_loaded_models_fixture, +) -> None: + # when + response = requests.post( + f"{server_url}/workflows/run", + json={ + "specification": WORKFLOW_WITH_PYTHON_BLOCK_RUNNING_ON_BATCH, + "api_key": API_KEY, + "inputs": { + "image": [ + { + "type": "url", + "value": "https://media.roboflow.com/fruit.png", + } + ] + * 2, + }, + }, + ) + + # then + response.raise_for_status() + response_data = response.json() + assert isinstance( + response_data["outputs"], list + ), "Expected list of elements to be returned" + assert ( + len(response_data["outputs"]) == 2 + ), "Two images submitted - two responses expected" + assert set(response_data["outputs"][0].keys()) == { + "max_confidence" + }, "Expected only `max_confidence` output" + assert set(response_data["outputs"][1].keys()) == { + "max_confidence" + }, "Expected only `max_confidence` output" diff --git a/tests/workflows/integration_tests/execution/test_workflow_with_custom_python_block.py b/tests/workflows/integration_tests/execution/test_workflow_with_custom_python_block.py new file mode 100644 index 0000000000..d74642e2d9 --- /dev/null +++ b/tests/workflows/integration_tests/execution/test_workflow_with_custom_python_block.py @@ -0,0 +1,926 @@ +from unittest import mock + +import numpy as np +import pytest + +from inference.core.env import WORKFLOWS_MAX_CONCURRENT_STEPS +from inference.core.managers.base import ModelManager +from inference.core.workflows.core_steps.common.entities import StepExecutionMode +from inference.core.workflows.errors import ( + DynamicBlockError, + WorkflowEnvironmentConfigurationError, +) +from inference.core.workflows.execution_engine.core import ExecutionEngine +from inference.core.workflows.execution_engine.dynamic_blocks import block_assembler + +FUNCTION_TO_GET_OVERLAP_OF_BBOXES = """ +def run(self, predictions: sv.Detections, class_x: str, class_y: str) -> BlockResult: + bboxes_class_x = predictions[predictions.data["class_name"] == class_x] + bboxes_class_y = predictions[predictions.data["class_name"] == class_y] + overlap = [] + for bbox_x in bboxes_class_x: + bbox_x_coords = bbox_x[0] + bbox_overlaps = [] + for bbox_y in bboxes_class_y: + if bbox_y[-1]["detection_id"] == bbox_x[-1]["detection_id"]: + continue + bbox_y_coords = bbox_y[0] + x_min = max(bbox_x_coords[0], bbox_y_coords[0]) + y_min = max(bbox_x_coords[1], bbox_y_coords[1]) + x_max = min(bbox_x_coords[2], bbox_y_coords[2]) + y_max = min(bbox_x_coords[3], bbox_y_coords[3]) + # compute the area of intersection rectangle + intersection_area = max(0, x_max - x_min + 1) * max(0, y_max - y_min + 1) + box_x_area = (bbox_x_coords[2] - bbox_x_coords[0] + 1) * (bbox_x_coords[3] - bbox_x_coords[1] + 1) + local_overlap = intersection_area / (box_x_area + 1e-5) + bbox_overlaps.append(local_overlap) + overlap.append(bbox_overlaps) + return {"overlap": overlap} +""" + + +FUNCTION_TO_GET_MAXIMUM_OVERLAP = """ +def run(self, overlaps: List[List[float]]) -> BlockResult: + max_value = -1 + for overlap in overlaps: + for overlap_value in overlap: + if not max_value: + max_value = overlap_value + else: + max_value = max(max_value, overlap_value) + return {"max_value": max_value} +""" + +WORKFLOW_WITH_OVERLAP_MEASUREMENT = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + ], + "dynamic_blocks_definitions": [ + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "OverlapMeasurement", + "inputs": { + "predictions": { + "type": "DynamicInputDefinition", + "selector_types": ["step_output"], + }, + "class_x": { + "type": "DynamicInputDefinition", + "value_types": ["string"], + }, + "class_y": { + "type": "DynamicInputDefinition", + "value_types": ["string"], + }, + }, + "outputs": {"overlap": {"type": "DynamicOutputDefinition", "kind": []}}, + }, + "code": { + "type": "PythonCode", + "run_function_code": FUNCTION_TO_GET_OVERLAP_OF_BBOXES, + }, + }, + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "MaximumOverlap", + "inputs": { + "overlaps": { + "type": "DynamicInputDefinition", + "selector_types": ["step_output"], + }, + }, + "outputs": { + "max_value": {"type": "DynamicOutputDefinition", "kind": []} + }, + }, + "code": { + "type": "PythonCode", + "run_function_code": FUNCTION_TO_GET_MAXIMUM_OVERLAP, + }, + }, + ], + "steps": [ + { + "type": "RoboflowObjectDetectionModel", + "name": "model", + "image": "$inputs.image", + "model_id": "yolov8n-640", + }, + { + "type": "OverlapMeasurement", + "name": "overlap_measurement", + "predictions": "$steps.model.predictions", + "class_x": "dog", + "class_y": "dog", + }, + { + "type": "ContinueIf", + "name": "continue_if", + "condition_statement": { + "type": "StatementGroup", + "statements": [ + { + "type": "BinaryStatement", + "left_operand": { + "type": "DynamicOperand", + "operand_name": "overlaps", + "operations": [{"type": "SequenceLength"}], + }, + "comparator": {"type": "(Number) >="}, + "right_operand": { + "type": "StaticOperand", + "value": 1, + }, + } + ], + }, + "evaluation_parameters": {"overlaps": "$steps.overlap_measurement.overlap"}, + "next_steps": ["$steps.maximum_overlap"], + }, + { + "type": "MaximumOverlap", + "name": "maximum_overlap", + "overlaps": "$steps.overlap_measurement.overlap", + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "overlaps", + "selector": "$steps.overlap_measurement.overlap", + }, + { + "type": "JsonField", + "name": "max_overlap", + "selector": "$steps.maximum_overlap.max_value", + }, + ], +} + + +@pytest.mark.asyncio +async def test_workflow_with_custom_python_blocks_measuring_overlap( + model_manager: ModelManager, + dogs_image: np.ndarray, + crowd_image: np.ndarray, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": None, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + execution_engine = ExecutionEngine.init( + workflow_definition=WORKFLOW_WITH_OVERLAP_MEASUREMENT, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + + # when + result = await execution_engine.run_async( + runtime_parameters={ + "image": [dogs_image, crowd_image], + } + ) + + # then + assert isinstance(result, list), "Expected list to be delivered" + assert len(result) == 2, "Expected 2 elements in the output for two input images" + assert set(result[0].keys()) == { + "overlaps", + "max_overlap", + }, "Expected all declared outputs to be delivered" + assert set(result[1].keys()) == { + "overlaps", + "max_overlap", + }, "Expected all declared outputs to be delivered" + assert ( + len(result[0]["overlaps"]) == 2 + ), "Expected 2 instances of dogs found, each overlap with another for first image" + assert ( + abs(result[0]["max_overlap"] - 0.177946) < 1e-3 + ), "Expected max overlap to be calculated properly" + assert ( + len(result[1]["overlaps"]) == 0 + ), "Expected no instances of dogs found for second image" + assert ( + result[1]["max_overlap"] is None + ), "Expected `max_overlap` not to be calculated for second image due to conditional execution" + + +FUNCTION_TO_GET_MAXIMUM_CONFIDENCE_FROM_BATCH_OF_DETECTIONS = """ +def run(self, predictions: Batch[sv.Detections]) -> BlockResult: + result = [] + for prediction in predictions: + result.append({"max_confidence": np.max(prediction.confidence).item()}) + return result +""" + +WORKFLOW_WITH_PYTHON_BLOCK_RUNNING_ON_BATCH = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + ], + "dynamic_blocks_definitions": [ + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "MaxConfidence", + "inputs": { + "predictions": { + "type": "DynamicInputDefinition", + "selector_types": ["step_output"], + }, + }, + "outputs": { + "max_confidence": { + "type": "DynamicOutputDefinition", + "kind": ["float_zero_to_one"], + } + }, + "accepts_batch_input": True, + }, + "code": { + "type": "PythonCode", + "run_function_code": FUNCTION_TO_GET_MAXIMUM_CONFIDENCE_FROM_BATCH_OF_DETECTIONS, + }, + }, + ], + "steps": [ + { + "type": "RoboflowObjectDetectionModel", + "name": "model", + "image": "$inputs.image", + "model_id": "yolov8n-640", + }, + { + "type": "MaxConfidence", + "name": "confidence_aggregation", + "predictions": "$steps.model.predictions", + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "max_confidence", + "selector": "$steps.confidence_aggregation.max_confidence", + }, + ], +} + + +@pytest.mark.asyncio +async def test_workflow_with_custom_python_block_operating_on_batch( + model_manager: ModelManager, + dogs_image: np.ndarray, + crowd_image: np.ndarray, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": None, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + execution_engine = ExecutionEngine.init( + workflow_definition=WORKFLOW_WITH_PYTHON_BLOCK_RUNNING_ON_BATCH, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + + # when + result = await execution_engine.run_async( + runtime_parameters={ + "image": [dogs_image, crowd_image], + } + ) + + # then + assert isinstance(result, list), "Expected list to be delivered" + assert len(result) == 2, "Expected 2 elements in the output for two input images" + assert set(result[0].keys()) == { + "max_confidence", + }, "Expected all declared outputs to be delivered" + assert set(result[1].keys()) == { + "max_confidence", + }, "Expected all declared outputs to be delivered" + assert ( + abs(result[0]["max_confidence"] - 0.85599) < 1e-3 + ), "Expected max confidence to be extracted" + assert ( + abs(result[1]["max_confidence"] - 0.84284) < 1e-3 + ), "Expected max confidence to be extracted" + + +FUNCTION_TO_ASSOCIATE_DETECTIONS_FOR_CROPS = """ +def my_function(self, prediction: sv.Detections, crops: Batch[WorkflowImageData]) -> BlockResult: + detection_id2bbox = { + detection_id.item(): i for i, detection_id in enumerate(prediction.data["detection_id"]) + } + results = [] + for crop in crops: + parent_id = crop.parent_metadata.parent_id + results.append({"associated_detections": prediction[detection_id2bbox[parent_id]]}) + return results +""" + + +WORKFLOW_WITH_PYTHON_BLOCK_RUNNING_CROSS_DIMENSIONS = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + ], + "dynamic_blocks_definitions": [ + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "DetectionsToCropsAssociation", + "inputs": { + "prediction": { + "type": "DynamicInputDefinition", + "selector_types": ["step_output"], + "selector_data_kind": { + "step_output": [ + "Batch[object_detection_prediction]", + "Batch[instance_segmentation_prediction]", + "Batch[keypoint_detection_prediction]", + ] + }, + }, + "crops": { + "type": "DynamicInputDefinition", + "selector_types": ["step_output_image"], + "is_dimensionality_reference": True, + "dimensionality_offset": 1, + }, + }, + "outputs": { + "associated_detections": { + "type": "DynamicOutputDefinition", + "kind": [ + "Batch[object_detection_prediction]", + "Batch[instance_segmentation_prediction]", + "Batch[keypoint_detection_prediction]", + ], + } + }, + }, + "code": { + "type": "PythonCode", + "run_function_code": FUNCTION_TO_ASSOCIATE_DETECTIONS_FOR_CROPS, + "run_function_name": "my_function", + }, + }, + ], + "steps": [ + { + "type": "RoboflowObjectDetectionModel", + "name": "model", + "image": "$inputs.image", + "model_id": "yolov8n-640", + }, + { + "type": "Crop", + "name": "crop", + "image": "$inputs.image", + "predictions": "$steps.model.predictions", + }, + { + "type": "DetectionsToCropsAssociation", + "name": "detections_associations", + "prediction": "$steps.model.predictions", + "crops": "$steps.crop.crops", + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "associated_detections", + "selector": "$steps.detections_associations.associated_detections", + }, + ], +} + + +@pytest.mark.asyncio +async def test_workflow_with_custom_python_block_operating_cross_dimensions( + model_manager: ModelManager, + dogs_image: np.ndarray, + crowd_image: np.ndarray, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": None, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + execution_engine = ExecutionEngine.init( + workflow_definition=WORKFLOW_WITH_PYTHON_BLOCK_RUNNING_CROSS_DIMENSIONS, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + + # when + result = await execution_engine.run_async( + runtime_parameters={ + "image": [dogs_image, crowd_image], + } + ) + + # then + assert isinstance(result, list), "Expected list to be delivered" + assert len(result) == 2, "Expected 2 elements in the output for two input images" + assert set(result[0].keys()) == { + "associated_detections", + }, "Expected all declared outputs to be delivered" + assert set(result[1].keys()) == { + "associated_detections", + }, "Expected all declared outputs to be delivered" + assert len(result[1]["associated_detections"]) == 12 + class_names_first_image_crops = [ + e["class_name"].tolist() for e in result[0]["associated_detections"] + ] + for class_names in class_names_first_image_crops: + assert len(class_names) == 1, "Expected single bbox to be associated" + assert len(class_names_first_image_crops) == 2, "Expected 2 crops for first image" + class_names_second_image_crops = [ + e["class_name"].tolist() for e in result[1]["associated_detections"] + ] + for class_names in class_names_second_image_crops: + assert len(class_names) == 1, "Expected single bbox to be associated" + assert ( + len(class_names_second_image_crops) == 12 + ), "Expected 12 crops for second image" + + +@pytest.mark.asyncio +@mock.patch.object(block_assembler, "ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS", False) +async def test_workflow_with_custom_python_block_when_custom_python_execution_forbidden( + model_manager: ModelManager, + dogs_image: np.ndarray, + crowd_image: np.ndarray, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": None, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + + # when + with pytest.raises(WorkflowEnvironmentConfigurationError): + _ = ExecutionEngine.init( + workflow_definition=WORKFLOW_WITH_PYTHON_BLOCK_RUNNING_CROSS_DIMENSIONS, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + + +FUNCTION_TO_MERGE_CROPS_INTO_TILES = """ +def run(self, crops: Optional[Batch[Optional[WorkflowImageData]]]) -> BlockResult: + if crops is None: + return {"tiles": None} + black_image = np.zeros((192, 168, 3), dtype=np.uint8) + images = [crop.numpy_image if crop is not None else black_image for crop in crops] + return {"tiles": sv.create_tiles(images)} +""" + + +WORKFLOW_WITH_PYTHON_BLOCK_RUNNING_DIMENSIONALITY_REDUCTION = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + ], + "dynamic_blocks_definitions": [ + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "DimensionalityReduction", + "inputs": { + "crops": { + "type": "DynamicInputDefinition", + "selector_types": ["step_output_image"], + }, + }, + "outputs": {"tiles": {"type": "DynamicOutputDefinition", "kind": []}}, + "output_dimensionality_offset": -1, + "accepts_empty_values": True, + }, + "code": { + "type": "PythonCode", + "run_function_code": FUNCTION_TO_MERGE_CROPS_INTO_TILES, + }, + }, + ], + "steps": [ + { + "type": "RoboflowObjectDetectionModel", + "name": "model", + "image": "$inputs.image", + "model_id": "yolov8n-640", + "class_filter": ["person"], + }, + { + "type": "Crop", + "name": "crop", + "image": "$inputs.image", + "predictions": "$steps.model.predictions", + }, + { + "type": "DimensionalityReduction", + "name": "tile_creation", + "crops": "$steps.crop.crops", + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "tiles", + "selector": "$steps.tile_creation.tiles", + }, + ], +} + + +@pytest.mark.asyncio +async def test_workflow_with_custom_python_block_reducing_dimensionality( + model_manager: ModelManager, + dogs_image: np.ndarray, + crowd_image: np.ndarray, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": None, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + execution_engine = ExecutionEngine.init( + workflow_definition=WORKFLOW_WITH_PYTHON_BLOCK_RUNNING_DIMENSIONALITY_REDUCTION, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + + # when + result = await execution_engine.run_async( + runtime_parameters={ + "image": [dogs_image, crowd_image], + } + ) + + # then + assert isinstance(result, list), "Expected list to be delivered" + assert len(result) == 2, "Expected 2 elements in the output for two input images" + assert set(result[0].keys()) == { + "tiles", + }, "Expected all declared outputs to be delivered" + assert set(result[1].keys()) == { + "tiles", + }, "Expected all declared outputs to be delivered" + assert result[0]["tiles"] is None, "Expected no crops - hence empty output" + assert isinstance(result[1]["tiles"], np.ndarray), "Expected np array with tile" + + +MODEL_INIT_FUNCTION = """ +def init_model() -> Dict[str, Any]: + model = YOLOv8ObjectDetection(model_id="yolov8n-640") + return {"model": model} +""" + +MODEL_INFER_FUNCTION = """ +def infer(self, image: WorkflowImageData) -> BlockResult: + predictions = self._init_results["model"].infer(image.numpy_image) + return {"predictions": sv.Detections.from_inference(predictions[0].model_dump(by_alias=True, exclude_none=True))} +""" + +WORKFLOW_WITH_PYTHON_BLOCK_HOSTING_MODEL = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + ], + "dynamic_blocks_definitions": [ + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "CustomModel", + "inputs": { + "image": { + "type": "DynamicInputDefinition", + "selector_types": ["input_image"], + }, + }, + "outputs": { + "predictions": { + "type": "DynamicOutputDefinition", + "kind": [ + "Batch[object_detection_prediction]", + ], + } + }, + }, + "code": { + "type": "PythonCode", + "run_function_code": MODEL_INFER_FUNCTION, + "run_function_name": "infer", + "init_function_code": MODEL_INIT_FUNCTION, + "init_function_name": "init_model", + "imports": [ + "from inference.models.yolov8 import YOLOv8ObjectDetection", + ], + }, + }, + ], + "steps": [ + { + "type": "CustomModel", + "name": "model", + "image": "$inputs.image", + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "predictions", + "selector": "$steps.model.predictions", + }, + ], +} + + +@pytest.mark.asyncio +async def test_workflow_with_custom_python_block_running_custom_model( + model_manager: ModelManager, + dogs_image: np.ndarray, + crowd_image: np.ndarray, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": None, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + execution_engine = ExecutionEngine.init( + workflow_definition=WORKFLOW_WITH_PYTHON_BLOCK_HOSTING_MODEL, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + + # when + result = await execution_engine.run_async( + runtime_parameters={ + "image": [dogs_image, crowd_image], + } + ) + + # then + assert isinstance(result, list), "Expected list to be delivered" + assert len(result) == 2, "Expected 2 elements in the output for two input images" + assert set(result[0].keys()) == { + "predictions", + }, "Expected all declared outputs to be delivered" + assert set(result[1].keys()) == { + "predictions", + }, "Expected all declared outputs to be delivered" + assert np.allclose( + result[0]["predictions"].confidence, + [0.85599, 0.50392], + atol=1e-3, + ), "Expected reproducible predictions for first image" + assert np.allclose( + result[1]["predictions"].confidence, + [ + 0.84284, + 0.83957, + 0.81555, + 0.80455, + 0.75804, + 0.75794, + 0.71715, + 0.71408, + 0.71003, + 0.56938, + 0.54092, + 0.43511, + ], + atol=1e-3, + ), "Expected reproducible predictions for second image" + + +BROKEN_RUN_FUNCTION = """ +def run(some: InvalidType): + pass +""" + + +WORKFLOW_WITH_CODE_THAT_DOES_NOT_COMPILE = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + ], + "dynamic_blocks_definitions": [ + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "CustomModel", + "inputs": { + "image": { + "type": "DynamicInputDefinition", + "selector_types": ["input_image"], + }, + }, + "outputs": { + "predictions": { + "type": "DynamicOutputDefinition", + "kind": [], + } + }, + }, + "code": { + "type": "PythonCode", + "run_function_code": BROKEN_RUN_FUNCTION, + }, + }, + ], + "steps": [ + { + "type": "CustomModel", + "name": "model", + "image": "$inputs.image", + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "predictions", + "selector": "$steps.model.predictions", + }, + ], +} + + +@pytest.mark.asyncio +async def test_workflow_with_custom_python_block_when_code_cannot_be_compiled( + model_manager: ModelManager, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": None, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + + # when + with pytest.raises(DynamicBlockError): + _ = ExecutionEngine.init( + workflow_definition=WORKFLOW_WITH_CODE_THAT_DOES_NOT_COMPILE, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + + +WORKFLOW_WITHOUT_RUN_FUNCTION = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + ], + "dynamic_blocks_definitions": [ + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "CustomModel", + "inputs": { + "image": { + "type": "DynamicInputDefinition", + "selector_types": ["input_image"], + }, + }, + "outputs": { + "predictions": { + "type": "DynamicOutputDefinition", + "kind": [], + } + }, + }, + "code": { + "type": "PythonCode", + "run_function_code": "", + }, + }, + ], + "steps": [ + { + "type": "CustomModel", + "name": "model", + "image": "$inputs.image", + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "predictions", + "selector": "$steps.model.predictions", + }, + ], +} + + +@pytest.mark.asyncio +async def test_workflow_with_custom_python_block_when_code_does_not_define_declared_run_function( + model_manager: ModelManager, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": None, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + + # when + with pytest.raises(DynamicBlockError): + _ = ExecutionEngine.init( + workflow_definition=WORKFLOW_WITHOUT_RUN_FUNCTION, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + + +WORKFLOW_WITHOUT_DECLARED_INIT_FUNCTION = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + ], + "dynamic_blocks_definitions": [ + { + "type": "DynamicBlockDefinition", + "manifest": { + "type": "ManifestDescription", + "block_type": "CustomModel", + "inputs": { + "image": { + "type": "DynamicInputDefinition", + "selector_types": ["input_image"], + }, + }, + "outputs": { + "predictions": { + "type": "DynamicOutputDefinition", + "kind": [], + } + }, + }, + "code": { + "type": "PythonCode", + "run_function_code": MODEL_INFER_FUNCTION, + "run_function_name": "infer", + "init_function_code": "", + "init_function_name": "init_model", + "imports": [ + "from inference.models.yolov8 import YOLOv8ObjectDetection", + ], + }, + }, + ], + "steps": [ + { + "type": "CustomModel", + "name": "model", + "image": "$inputs.image", + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "predictions", + "selector": "$steps.model.predictions", + }, + ], +} + + +@pytest.mark.asyncio +async def test_workflow_with_custom_python_block_when_code_does_not_define_declared_init_function( + model_manager: ModelManager, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": None, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + + # when + with pytest.raises(DynamicBlockError): + _ = ExecutionEngine.init( + workflow_definition=WORKFLOW_WITHOUT_DECLARED_INIT_FUNCTION, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) diff --git a/tests/workflows/unit_tests/core_steps/formatters/test_property_extraction.py b/tests/workflows/unit_tests/core_steps/formatters/test_property_extraction.py index 013bdb7ebc..db08c2cb9a 100644 --- a/tests/workflows/unit_tests/core_steps/formatters/test_property_extraction.py +++ b/tests/workflows/unit_tests/core_steps/formatters/test_property_extraction.py @@ -2,8 +2,8 @@ from inference.core.entities.responses.inference import ( ClassificationInferenceResponse, - InferenceResponseImage, ClassificationPrediction, + InferenceResponseImage, ) from inference.core.workflows.core_steps.common.query_language.entities.operations import ( OperationsChain, diff --git a/tests/workflows/unit_tests/core_steps/fusion/test_detections_classes_replacement.py b/tests/workflows/unit_tests/core_steps/fusion/test_detections_classes_replacement.py index 944a9ec1de..a4d9a9367a 100644 --- a/tests/workflows/unit_tests/core_steps/fusion/test_detections_classes_replacement.py +++ b/tests/workflows/unit_tests/core_steps/fusion/test_detections_classes_replacement.py @@ -1,15 +1,20 @@ import numpy as np import pytest - import supervision as sv from supervision.config import CLASS_NAME_DATA_FIELD -from inference.core.entities.responses.inference import MultiLabelClassificationInferenceResponse, \ - InferenceResponseImage, MultiLabelClassificationPrediction, ClassificationInferenceResponse, \ - ClassificationPrediction +from inference.core.entities.responses.inference import ( + ClassificationInferenceResponse, + ClassificationPrediction, + InferenceResponseImage, + MultiLabelClassificationInferenceResponse, + MultiLabelClassificationPrediction, +) from inference.core.workflows.constants import DETECTION_ID_KEY -from inference.core.workflows.core_steps.fusion.detections_classes_replacement import DetectionsClassesReplacementBlock, \ - extract_leading_class_from_prediction +from inference.core.workflows.core_steps.fusion.detections_classes_replacement import ( + DetectionsClassesReplacementBlock, + extract_leading_class_from_prediction, +) from inference.core.workflows.entities.base import Batch @@ -25,7 +30,9 @@ async def test_classes_replacement_when_object_detection_object_is_none() -> Non ) # then - assert result == {"predictions": None}, "object_detection_predictions is superior object so lack of value means lack of output" + assert result == { + "predictions": None + }, "object_detection_predictions is superior object so lack of value means lack of output" @pytest.mark.asyncio @@ -43,24 +50,30 @@ async def test_classes_replacement_when_there_are_no_predictions_is_none() -> No ) # then - assert result == {"predictions": sv.Detections.empty()}, "classification_predictions is inferior object so lack of value means empty output" + assert result == { + "predictions": sv.Detections.empty() + }, "classification_predictions is inferior object so lack of value means empty output" @pytest.mark.asyncio -async def test_classes_replacement_when_replacement_to_happen_without_filtering_for_multi_label_results() -> None: +async def test_classes_replacement_when_replacement_to_happen_without_filtering_for_multi_label_results() -> ( + None +): # given step = DetectionsClassesReplacementBlock() detections = sv.Detections( - xyxy=np.array([ - [10, 20, 30, 40], - [11, 21, 31, 41], - ]), + xyxy=np.array( + [ + [10, 20, 30, 40], + [11, 21, 31, 41], + ] + ), class_id=np.array([7, 7]), confidence=np.array([0.36, 0.91]), data={ "class_name": np.array(["animal", "animal"]), - "detection_id": np.array(["zero", "one"]) - } + "detection_id": np.array(["zero", "one"]), + }, ) first_cls_prediction = MultiLabelClassificationInferenceResponse( image=InferenceResponseImage(width=128, height=256), @@ -85,7 +98,7 @@ async def test_classes_replacement_when_replacement_to_happen_without_filtering_ first_cls_prediction, second_cls_prediction, ], - indices=[(0, 0), (0, 1)] + indices=[(0, 0), (0, 1)], ) # when @@ -95,28 +108,44 @@ async def test_classes_replacement_when_replacement_to_happen_without_filtering_ ) # then - assert np.allclose(result["predictions"].xyxy, np.array([[10, 20, 30, 40], [11, 21, 31, 41]])), "Expected coordinates not to be touched" - assert np.allclose(result["predictions"].confidence, np.array([0.6, 0.4])), "Expected to choose [cat, dog] confidences" - assert np.allclose(result["predictions"].class_id, np.array([0, 1])), "Expected to choose [cat, dog] class ids" - assert result["predictions"].data["class_name"].tolist() == ["cat", "dog"], "Expected cat class to be assigned" - assert result["predictions"].data["detection_id"].tolist() != ["zero", "one"], "Expected to generate new detection id" + assert np.allclose( + result["predictions"].xyxy, np.array([[10, 20, 30, 40], [11, 21, 31, 41]]) + ), "Expected coordinates not to be touched" + assert np.allclose( + result["predictions"].confidence, np.array([0.6, 0.4]) + ), "Expected to choose [cat, dog] confidences" + assert np.allclose( + result["predictions"].class_id, np.array([0, 1]) + ), "Expected to choose [cat, dog] class ids" + assert result["predictions"].data["class_name"].tolist() == [ + "cat", + "dog", + ], "Expected cat class to be assigned" + assert result["predictions"].data["detection_id"].tolist() != [ + "zero", + "one", + ], "Expected to generate new detection id" @pytest.mark.asyncio -async def test_classes_replacement_when_replacement_to_happen_without_filtering_for_multi_class_results() -> None: +async def test_classes_replacement_when_replacement_to_happen_without_filtering_for_multi_class_results() -> ( + None +): # given step = DetectionsClassesReplacementBlock() detections = sv.Detections( - xyxy=np.array([ - [10, 20, 30, 40], - [11, 21, 31, 41], - ]), + xyxy=np.array( + [ + [10, 20, 30, 40], + [11, 21, 31, 41], + ] + ), class_id=np.array([7, 7]), confidence=np.array([0.36, 0.91]), data={ "class_name": np.array(["animal", "animal"]), - "detection_id": np.array(["zero", "one"]) - } + "detection_id": np.array(["zero", "one"]), + }, ) first_cls_prediction = ClassificationInferenceResponse( image=InferenceResponseImage(width=128, height=256), @@ -153,7 +182,7 @@ async def test_classes_replacement_when_replacement_to_happen_without_filtering_ first_cls_prediction, second_cls_prediction, ], - indices=[(0, 0), (0, 1)] + indices=[(0, 0), (0, 1)], ) # when @@ -163,28 +192,44 @@ async def test_classes_replacement_when_replacement_to_happen_without_filtering_ ) # then - assert np.allclose(result["predictions"].xyxy, np.array([[10, 20, 30, 40], [11, 21, 31, 41]])), "Expected coordinates not to be touched" - assert np.allclose(result["predictions"].confidence, np.array([0.6, 0.6])), "Expected to choose [cat, dog] confidences" - assert np.allclose(result["predictions"].class_id, np.array([0, 1])), "Expected to choose [cat, dog] class ids" - assert result["predictions"].data["class_name"].tolist() == ["cat", "dog"], "Expected cat class to be assigned" - assert result["predictions"].data["detection_id"].tolist() != ["zero", "one"], "Expected to generate new detection id" + assert np.allclose( + result["predictions"].xyxy, np.array([[10, 20, 30, 40], [11, 21, 31, 41]]) + ), "Expected coordinates not to be touched" + assert np.allclose( + result["predictions"].confidence, np.array([0.6, 0.6]) + ), "Expected to choose [cat, dog] confidences" + assert np.allclose( + result["predictions"].class_id, np.array([0, 1]) + ), "Expected to choose [cat, dog] class ids" + assert result["predictions"].data["class_name"].tolist() == [ + "cat", + "dog", + ], "Expected cat class to be assigned" + assert result["predictions"].data["detection_id"].tolist() != [ + "zero", + "one", + ], "Expected to generate new detection id" @pytest.mark.asyncio -async def test_classes_replacement_when_replacement_to_happen_and_one_result_to_be_filtered_out() -> None: +async def test_classes_replacement_when_replacement_to_happen_and_one_result_to_be_filtered_out() -> ( + None +): # given step = DetectionsClassesReplacementBlock() detections = sv.Detections( - xyxy=np.array([ - [10, 20, 30, 40], - [11, 21, 31, 41], - ]), + xyxy=np.array( + [ + [10, 20, 30, 40], + [11, 21, 31, 41], + ] + ), class_id=np.array([7, 7]), confidence=np.array([0.36, 0.91]), data={ "class_name": np.array(["animal", "animal"]), - "detection_id": np.array(["zero", "one"]) - } + "detection_id": np.array(["zero", "one"]), + }, ) first_cls_prediction = MultiLabelClassificationInferenceResponse( image=InferenceResponseImage(width=128, height=256), @@ -200,7 +245,7 @@ async def test_classes_replacement_when_replacement_to_happen_and_one_result_to_ first_cls_prediction, None, ], - indices=[(0, 0), (0, 1)] + indices=[(0, 0), (0, 1)], ) # when @@ -210,13 +255,27 @@ async def test_classes_replacement_when_replacement_to_happen_and_one_result_to_ ) # then - assert len(result["predictions"]) == 1, "Expected only one bbox left, as there was mo cls result for second bbox" - assert np.allclose(result["predictions"].xyxy, np.array([[10, 20, 30, 40]])), "Expected first bbox to be left" - assert np.allclose(result["predictions"].confidence, np.array([0.6])), "Expected to choose cat confidence" - assert np.allclose(result["predictions"].class_id, np.array([0])), "Expected to choose cat class id" - assert result["predictions"].data["class_name"].tolist() == ["cat"], "Expected cat class to be assigned" - assert len(result["predictions"].data["detection_id"]) == 1, "Expected only single detection_id" - assert result["predictions"].data["detection_id"].tolist() != ["zero"], "Expected to generate new detection id" + assert ( + len(result["predictions"]) == 1 + ), "Expected only one bbox left, as there was mo cls result for second bbox" + assert np.allclose( + result["predictions"].xyxy, np.array([[10, 20, 30, 40]]) + ), "Expected first bbox to be left" + assert np.allclose( + result["predictions"].confidence, np.array([0.6]) + ), "Expected to choose cat confidence" + assert np.allclose( + result["predictions"].class_id, np.array([0]) + ), "Expected to choose cat class id" + assert result["predictions"].data["class_name"].tolist() == [ + "cat" + ], "Expected cat class to be assigned" + assert ( + len(result["predictions"].data["detection_id"]) == 1 + ), "Expected only single detection_id" + assert result["predictions"].data["detection_id"].tolist() != [ + "zero" + ], "Expected to generate new detection id" def test_extract_leading_class_from_prediction_when_prediction_is_multi_label() -> None: @@ -243,7 +302,9 @@ def test_extract_leading_class_from_prediction_when_prediction_is_multi_label() assert result == ("cat", 0, 0.6) -def test_extract_leading_class_from_prediction_when_prediction_is_faulty_multi_label() -> None: +def test_extract_leading_class_from_prediction_when_prediction_is_faulty_multi_label() -> ( + None +): # given prediction = ClassificationInferenceResponse( image=InferenceResponseImage(width=128, height=256), @@ -265,7 +326,9 @@ def test_extract_leading_class_from_prediction_when_prediction_is_faulty_multi_l _ = extract_leading_class_from_prediction(prediction=prediction) -def test_extract_leading_class_from_prediction_when_prediction_is_multi_class_with_predicted_classes() -> None: +def test_extract_leading_class_from_prediction_when_prediction_is_multi_class_with_predicted_classes() -> ( + None +): # given prediction = MultiLabelClassificationInferenceResponse( image=InferenceResponseImage(width=128, height=256), @@ -283,7 +346,9 @@ def test_extract_leading_class_from_prediction_when_prediction_is_multi_class_wi assert result == ("cat", 0, 0.6) -def test_extract_leading_class_from_prediction_when_prediction_is_multi_class_without_predicted_classes() -> None: +def test_extract_leading_class_from_prediction_when_prediction_is_multi_class_without_predicted_classes() -> ( + None +): # given prediction = MultiLabelClassificationInferenceResponse( image=InferenceResponseImage(width=128, height=256), @@ -301,12 +366,13 @@ def test_extract_leading_class_from_prediction_when_prediction_is_multi_class_wi assert result is None -def test_extract_leading_class_from_prediction_when_prediction_is_multi_class_without_classes_defined() -> None: +def test_extract_leading_class_from_prediction_when_prediction_is_multi_class_without_classes_defined() -> ( + None +): # given prediction = MultiLabelClassificationInferenceResponse( image=InferenceResponseImage(width=128, height=256), - predictions={ - }, + predictions={}, predicted_classes=[], ).dict(by_alias=True, exclude_none=True) diff --git a/tests/workflows/unit_tests/core_steps/fusion/test_domension_collapse.py b/tests/workflows/unit_tests/core_steps/fusion/test_domension_collapse.py index 1b7907a829..209d6e4be2 100644 --- a/tests/workflows/unit_tests/core_steps/fusion/test_domension_collapse.py +++ b/tests/workflows/unit_tests/core_steps/fusion/test_domension_collapse.py @@ -1,6 +1,8 @@ import pytest -from inference.core.workflows.core_steps.fusion.dimension_collapse import DimensionCollapseBlock +from inference.core.workflows.core_steps.fusion.dimension_collapse import ( + DimensionCollapseBlock, +) from inference.core.workflows.entities.base import Batch @@ -8,10 +10,7 @@ async def test_dimension_collapse() -> None: # given step = DimensionCollapseBlock() - data = Batch( - content=[1, 2, 3, 4], - indices=[(0, 1), (0, 2), (0, 3), (0, 4)] - ) + data = Batch(content=[1, 2, 3, 4], indices=[(0, 1), (0, 2), (0, 3), (0, 4)]) # when result = await step.run(data=data) diff --git a/tests/workflows/unit_tests/execution_engine/dynamic_blocs/__init__.py b/tests/workflows/unit_tests/execution_engine/dynamic_blocs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/workflows/unit_tests/execution_engine/dynamic_blocs/test_block_assembler.py b/tests/workflows/unit_tests/execution_engine/dynamic_blocs/test_block_assembler.py new file mode 100644 index 0000000000..1c76126b93 --- /dev/null +++ b/tests/workflows/unit_tests/execution_engine/dynamic_blocs/test_block_assembler.py @@ -0,0 +1,505 @@ +from typing import Type, Union +from unittest import mock + +import pytest +from pydantic import ValidationError +from pydantic_core import PydanticUndefinedType + +from inference.core.workflows.entities.base import OutputDefinition +from inference.core.workflows.entities.types import ( + WILDCARD_KIND, + Kind, + StepOutputImageSelector, + StepOutputSelector, + WorkflowImageSelector, + WorkflowParameterSelector, +) +from inference.core.workflows.errors import DynamicBlockError +from inference.core.workflows.execution_engine.dynamic_blocks import block_assembler +from inference.core.workflows.execution_engine.dynamic_blocks.block_assembler import ( + build_input_field_metadata, + build_outputs_definitions, + collect_input_dimensionality_offsets, + collect_python_types_for_selectors, + collect_python_types_for_values, + create_dynamic_block_specification, + pick_dimensionality_reference_property, +) +from inference.core.workflows.execution_engine.dynamic_blocks.entities import ( + DynamicBlockDefinition, + DynamicInputDefinition, + DynamicOutputDefinition, + ManifestDescription, + PythonCode, + SelectorType, + ValueType, +) + + +def test_pick_dimensionality_reference_property_when_there_is_no_reference_property() -> ( + None +): + # given + inputs = { + "a": DynamicInputDefinition( + type="DynamicInputDefinition", selector_types=[SelectorType.INPUT_PARAMETER] + ), + "b": DynamicInputDefinition( + type="DynamicInputDefinition", value_types=[ValueType.INTEGER] + ), + } + + # when + result = pick_dimensionality_reference_property( + block_type="some", + inputs=inputs, + ) + + # then + assert result is None + + +def test_pick_dimensionality_reference_property_when_there_is_single_reference_property() -> ( + None +): + # given + inputs = { + "a": DynamicInputDefinition( + type="DynamicInputDefinition", + selector_types=[SelectorType.INPUT_PARAMETER], + is_dimensionality_reference=True, + ), + "b": DynamicInputDefinition( + type="DynamicInputDefinition", value_types=[ValueType.INTEGER] + ), + } + + # when + result = pick_dimensionality_reference_property( + block_type="some", + inputs=inputs, + ) + + # then + assert result == "a", "Expected `a` to be picked as dimensionality reference" + + +def test_pick_dimensionality_reference_property_when_there_are_multiple_reference_properties() -> ( + None +): + # given + inputs = { + "a": DynamicInputDefinition( + type="DynamicInputDefinition", + selector_types=[SelectorType.INPUT_PARAMETER], + is_dimensionality_reference=True, + ), + "b": DynamicInputDefinition( + type="DynamicInputDefinition", + value_types=[ValueType.INTEGER], + is_dimensionality_reference=True, + ), + } + + # when + with pytest.raises(DynamicBlockError): + _ = pick_dimensionality_reference_property( + block_type="some", + inputs=inputs, + ) + + +def test_build_outputs_definitions_when_build_should_succeed() -> None: + # given + outputs = { + "a": DynamicOutputDefinition(type="DynamicOutputDefinition"), + "b": DynamicOutputDefinition( + type="DynamicOutputDefinition", kind=["string", "integer"] + ), + } + kinds_lookup = { + "*": WILDCARD_KIND, + "string": Kind(name="string"), + "integer": Kind(name="integer"), + } + + # when + result = build_outputs_definitions( + block_type="some", + outputs=outputs, + kinds_lookup=kinds_lookup, + ) + + # then + assert result == [ + OutputDefinition(name="a", kind=[WILDCARD_KIND]), + OutputDefinition( + name="b", kind=[kinds_lookup["string"], kinds_lookup["integer"]] + ), + ], "Expected outputs to be built such that `a` has * kind and `b` has exactly the kinds that were defined" + + +def test_build_outputs_definitions_when_build_should_fail_on_not_recognised_kind() -> ( + None +): + # given + outputs = { + "a": DynamicOutputDefinition(type="DynamicOutputDefinition"), + "b": DynamicOutputDefinition( + type="DynamicOutputDefinition", kind=["string", "integer"] + ), + } + kinds_lookup = { + "*": WILDCARD_KIND, + "string": Kind(name="string"), + } + + # when + with pytest.raises(DynamicBlockError): + _ = build_outputs_definitions( + block_type="some", + outputs=outputs, + kinds_lookup=kinds_lookup, + ) + + +def test_collect_input_dimensionality_offsets() -> None: + # given + inputs = { + "a": DynamicInputDefinition( + type="DynamicInputDefinition", + selector_types=[SelectorType.INPUT_PARAMETER], + dimensionality_offset=1, + ), + "b": DynamicInputDefinition( + type="DynamicInputDefinition", + selector_types=[SelectorType.INPUT_PARAMETER], + ), + "c": DynamicInputDefinition( + type="DynamicInputDefinition", + selector_types=[SelectorType.INPUT_PARAMETER], + dimensionality_offset=-1, + ), + } + + # when + result = collect_input_dimensionality_offsets(inputs=inputs) + + # then + assert result == { + "a": 1, + "c": -1, + }, "Expected only entries with non-default value be given in results" + + +def test_build_input_field_metadata_for_field_without_default_value() -> None: + # given + input_definition = DynamicInputDefinition( + type="DynamicInputDefinition", + selector_types=[SelectorType.INPUT_PARAMETER], + dimensionality_offset=1, + ) + + # when + result = build_input_field_metadata(input_definition=input_definition) + + # then + assert isinstance(result.default, PydanticUndefinedType) + + +def test_build_input_field_metadata_for_field_without_default_being_none() -> None: + # given + input_definition = DynamicInputDefinition( + type="DynamicInputDefinition", + value_types=[ValueType.INTEGER], + is_optional=True, + has_default_value=True, + ) + + # when + result = build_input_field_metadata(input_definition=input_definition) + + # then + assert result.default is None + + +def test_build_input_field_metadata_for_field_without_default_being_primitive() -> None: + # given + input_definition = DynamicInputDefinition( + type="DynamicInputDefinition", + value_types=[ValueType.INTEGER], + is_optional=True, + has_default_value=True, + default_value=3.0, + ) + + # when + result = build_input_field_metadata(input_definition=input_definition) + + # then + assert result.default == 3 + + +@pytest.mark.parametrize("default_type", [list, set, dict]) +def test_build_input_field_metadata_for_field_without_default_being_compound( + default_type: Union[Type[list], Type[set], Type[dict]], +) -> None: + # given + input_definition = DynamicInputDefinition( + type="DynamicInputDefinition", + value_types=[ValueType.LIST], + has_default_value=True, + default_value=default_type(), + ) + + # when + result = build_input_field_metadata(input_definition=input_definition) + + # then + assert ( + result.default_factory() == default_type() + ), "Expected default_factory used creates new instance of compound element" + + +@pytest.mark.parametrize( + "default_value", [[2, 3, 4], {"a", "b", "c"}, {"a": 1, "b": 2}] +) +def test_build_input_field_metadata_for_field_without_default_being_non_empty_compound( + default_value: Union[set, list, dict], +) -> None: + # given + input_definition = DynamicInputDefinition( + type="DynamicInputDefinition", + value_types=[ValueType.LIST], + has_default_value=True, + default_value=default_value, + ) + + # when + result = build_input_field_metadata(input_definition=input_definition) + + # then + assert ( + result.default_factory() == default_value + ), "Expected default_factory to create identical instance of compound data" + assert id(result.default_factory()) != id( + default_value + ), "Expected default_factory to create new instance of compound data" + + +def test_collect_python_types_for_values_when_types_can_be_resolved() -> None: + # given + input_definition = DynamicInputDefinition( + type="DynamicInputDefinition", + value_types=[ValueType.LIST, ValueType.INTEGER], + ) + + # when + result = collect_python_types_for_values( + block_type="some", + input_name="a", + input_definition=input_definition, + ) + + # then + assert result == [list, int], "Expected python types to be resolved properly" + + +@mock.patch.object(block_assembler, "PYTHON_TYPES_MAPPING", {}) +def test_collect_python_types_for_values_when_type_cannot_be_resolved() -> None: + # given + input_definition = DynamicInputDefinition( + type="DynamicInputDefinition", + value_types=[ValueType.LIST, ValueType.INTEGER], + ) + + # when + with pytest.raises(DynamicBlockError): + _ = collect_python_types_for_values( + block_type="some", + input_name="a", + input_definition=input_definition, + ) + + +def test_collect_python_types_for_selectors_when_collection_should_succeed() -> None: + # given + kinds_lookup = { + "*": WILDCARD_KIND, + "string": Kind(name="string"), + "integer": Kind(name="integer"), + } + input_definition = DynamicInputDefinition( + type="DynamicInputDefinition", + selector_types=[ + SelectorType.INPUT_PARAMETER, + SelectorType.INPUT_IMAGE, + SelectorType.STEP_OUTPUT_IMAGE, + SelectorType.STEP_OUTPUT, + ], + selector_data_kind={SelectorType.STEP_OUTPUT: ["string", "integer"]}, + ) + + # when + result = collect_python_types_for_selectors( + block_type="some", + input_name="a", + input_definition=input_definition, + kinds_lookup=kinds_lookup, + ) + + # then + + assert len(result) == 4, "Expected union of 4 types" + assert repr(result[0]) == repr( + WorkflowParameterSelector(kind=[WILDCARD_KIND]) + ), "First element of union is to be input param of kind *" + assert repr(result[1]) == repr( + WorkflowImageSelector + ), "Second element of union is to be input image selector" + assert repr(result[2]) == repr( + StepOutputImageSelector + ), "Third element of union is to be step output image selector" + assert repr(result[3]) == repr( + StepOutputSelector(kind=[kinds_lookup["string"], kinds_lookup["integer"]]) + ), "Last element of union is to be step output selector of kinds string integer" + + +def test_collect_python_types_for_selectors_when_collection_should_fail_on_unknown_kind() -> ( + None +): + # given + kinds_lookup = { + "*": WILDCARD_KIND, + "string": Kind(name="string"), + } + input_definition = DynamicInputDefinition( + type="DynamicInputDefinition", + selector_types=[ + SelectorType.INPUT_PARAMETER, + SelectorType.INPUT_IMAGE, + SelectorType.STEP_OUTPUT_IMAGE, + SelectorType.STEP_OUTPUT, + ], + selector_data_kind={SelectorType.STEP_OUTPUT: ["string", "integer"]}, + ) + + # when + with pytest.raises(DynamicBlockError): + _ = collect_python_types_for_selectors( + block_type="some", + input_name="a", + input_definition=input_definition, + kinds_lookup=kinds_lookup, + ) + + +PYTHON_CODE = """ +def run(self, a, b): + return {"output": b[::-1]} +""" + + +@pytest.mark.asyncio +async def test_create_dynamic_block_specification() -> None: + # given + kinds_lookup = { + "*": WILDCARD_KIND, + "string": Kind(name="string"), + "integer": Kind(name="integer"), + } + dynamic_block_definition = DynamicBlockDefinition( + type="DynamicBlockDefinition", + manifest=ManifestDescription( + type="ManifestDescription", + block_type="MyBlock", + inputs={ + "a": DynamicInputDefinition( + type="DynamicInputDefinition", + selector_types=[ + SelectorType.INPUT_PARAMETER, + SelectorType.STEP_OUTPUT, + ], + selector_data_kind={ + SelectorType.STEP_OUTPUT: ["string", "integer"] + }, + ), + "b": DynamicInputDefinition( + type="DynamicInputDefinition", + value_types=[ValueType.LIST], + has_default_value=True, + default_value=[1, 2, 3], + ), + }, + outputs={ + "a": DynamicOutputDefinition(type="DynamicOutputDefinition"), + "b": DynamicOutputDefinition( + type="DynamicOutputDefinition", kind=["string", "integer"] + ), + }, + output_dimensionality_offset=1, + accepts_batch_input=True, + ), + code=PythonCode( + type="PythonCode", + run_function_code=PYTHON_CODE, + ), + ) + + # when + result = create_dynamic_block_specification( + dynamic_block_definition=dynamic_block_definition, + kinds_lookup=kinds_lookup, + ) + + # then + assert result.block_source == "dynamic_workflows_blocks" + assert result.manifest_class.describe_outputs() == [ + OutputDefinition(name="a", kind=[WILDCARD_KIND]), + OutputDefinition( + name="b", kind=[kinds_lookup["string"], kinds_lookup["integer"]] + ), + ], "Expected outputs to be built such that `a` has * kind and `b` has exactly the kinds that were defined" + assert ( + result.manifest_class.accepts_batch_input() is True + ), "Manifest defined to accept batch input" + assert ( + result.manifest_class.accepts_empty_values() is False + ), "Manifest defined not to accept empty input" + assert ( + result.manifest_class.get_input_dimensionality_offsets() == {} + ), "No explicit offsets defined" + assert ( + result.manifest_class.get_dimensionality_reference_property() is None + ), "No dimensionality reference property expected" + assert ( + result.manifest_class.get_output_dimensionality_offset() == 1 + ), "Expected output dimensionality offset announced" + + block_instance = result.block_class() + code_run_result = await block_instance.run(a="some", b=[1, 2, 3]) + assert code_run_result == { + "output": [3, 2, 1] + }, "Expected code to work properly and revert second param" + + _ = result.manifest_class.model_validate( + {"name": "some", "type": "MyBlock", "a": "$steps.some.a", "b": [1, 2, 3, 4, 5]} + ) # no error expected + + _ = result.manifest_class.model_validate( + { + "name": "some", + "type": "MyBlock", + "a": "$steps.some.a", + } + ) # no error expected, default value for "b" defined + + with pytest.raises(ValidationError): + _ = result.manifest_class.model_validate( + {"name": "some", "type": "MyBlock", "a": "some", "b": [1, 2, 3, 4, 5]} + ) # error expected - value "a" without selector + + with pytest.raises(ValidationError): + _ = result.manifest_class.model_validate( + {"name": "some", "type": "MyBlock", "a": "$steps.some.a", "b": 1} + ) # error expected - value "b" not a list diff --git a/tests/workflows/unit_tests/execution_engine/dynamic_blocs/test_block_scaffolding.py b/tests/workflows/unit_tests/execution_engine/dynamic_blocs/test_block_scaffolding.py new file mode 100644 index 0000000000..4411be7336 --- /dev/null +++ b/tests/workflows/unit_tests/execution_engine/dynamic_blocs/test_block_scaffolding.py @@ -0,0 +1,211 @@ +from unittest import mock + +import pytest + +from inference.core.workflows.core_steps.formatters.expression import BlockManifest +from inference.core.workflows.errors import ( + DynamicBlockError, + WorkflowEnvironmentConfigurationError, +) +from inference.core.workflows.execution_engine.dynamic_blocks import block_scaffolding +from inference.core.workflows.execution_engine.dynamic_blocks.block_scaffolding import ( + assembly_custom_python_block, + create_dynamic_module, +) +from inference.core.workflows.execution_engine.dynamic_blocks.entities import PythonCode + + +def test_create_dynamic_module_when_syntax_error_happens() -> None: + # given + init_function = """ +def init_fun() -> Dict[str, Any]: + return {"a": 35} +""" + run_function = """ +def run_function( -> BlockResult: + return {"result": a + b} +""" + python_code = PythonCode( + type="PythonCode", + run_function_code=run_function, + run_function_name="run_function", + init_function_code=init_function, + init_function_name="init_fun", + imports=["import math"], + ) + + # when + with pytest.raises(DynamicBlockError): + _ = create_dynamic_module( + block_type_name="some", python_code=python_code, module_name="my_module" + ) + + +def test_create_dynamic_module_when_creation_should_succeed() -> None: + # given + init_function = """ +def init_fun() -> Dict[str, Any]: + return {"a": 35} +""" + run_function = """ +def run_function(a, b) -> BlockResult: + return {"result": a + b} +""" + python_code = PythonCode( + type="PythonCode", + run_function_code=run_function, + run_function_name="run_function", + init_function_code=init_function, + init_function_name="init_fun", + imports=["import math"], + ) + + # when + module = create_dynamic_module( + block_type_name="some", python_code=python_code, module_name="my_module" + ) + + # then + assert module.init_fun() == {"a": 35} + assert module.run_function(3, 5) == {"result": 8} + + +@pytest.mark.asyncio +async def test_assembly_custom_python_block() -> None: + # given + manifest = BlockManifest + init_function = """ +def init_fun() -> Dict[str, Any]: + return {"a": 6} +""" + run_function = """ +def run_function(self, a, b) -> BlockResult: + return {"result": a + b + self._init_results["a"]} + """ + python_code = PythonCode( + type="PythonCode", + run_function_code=run_function, + run_function_name="run_function", + init_function_code=init_function, + init_function_name="init_fun", + imports=["import math"], + ) + + # when + workflow_block_class = assembly_custom_python_block( + block_type_name="some", + unique_identifier="unique-id", + manifest=manifest, + python_code=python_code, + ) + workflow_block_instance = workflow_block_class() + execution_result = await workflow_block_instance.run(a=3, b=5) + + # then + assert ( + workflow_block_class.get_init_parameters() == [] + ), "Expected no init parameters defined" + assert ( + workflow_block_class.get_manifest() == BlockManifest + ), "Expected manifest to be returned" + assert execution_result == { + "result": 14 + }, "Expected result of 3 + 5 + 6 (last value from init)" + + +@pytest.mark.asyncio +async def test_assembly_custom_python_block_when_run_function_not_found() -> None: + # given + manifest = BlockManifest + init_function = """ +def init_fun() -> Dict[str, Any]: + return {"a": 6} +""" + run_function = """ +def run_function(self, a, b) -> BlockResult: + return {"result": a + b + self._init_results["a"]} + """ + python_code = PythonCode( + type="PythonCode", + run_function_code=run_function, + run_function_name="invalid", + init_function_code=init_function, + init_function_name="init_fun", + imports=["import math"], + ) + + # when + with pytest.raises(DynamicBlockError): + _ = assembly_custom_python_block( + block_type_name="some", + unique_identifier="unique-id", + manifest=manifest, + python_code=python_code, + ) + + +@pytest.mark.asyncio +async def test_assembly_custom_python_block_when_init_function_not_found() -> None: + # given + manifest = BlockManifest + init_function = """ +def init_fun() -> Dict[str, Any]: + return {"a": 6} +""" + run_function = """ +def run_function(self, a, b) -> BlockResult: + return {"result": a + b + self._init_results["a"]} + """ + python_code = PythonCode( + type="PythonCode", + run_function_code=run_function, + run_function_name="run_function", + init_function_code=init_function, + init_function_name="invalid", + imports=["import math"], + ) + + # when + with pytest.raises(DynamicBlockError): + _ = assembly_custom_python_block( + block_type_name="some", + unique_identifier="unique-id", + manifest=manifest, + python_code=python_code, + ) + + +@pytest.mark.asyncio +@mock.patch.object( + block_scaffolding, "ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS", False +) +async def test_run_assembled_custom_python_block_when_custom_python_forbidden() -> None: + # given + manifest = BlockManifest + init_function = """ +def init_fun() -> Dict[str, Any]: + return {"a": 6} +""" + run_function = """ +def run_function(self, a, b) -> BlockResult: + return {"result": a + b + self._init_results["a"]} + """ + python_code = PythonCode( + type="PythonCode", + run_function_code=run_function, + run_function_name="run_function", + init_function_code=init_function, + init_function_name="init_fun", + imports=["import math"], + ) + + # when + workflow_block_class = assembly_custom_python_block( + block_type_name="some", + unique_identifier="unique-id", + manifest=manifest, + python_code=python_code, + ) + workflow_block_instance = workflow_block_class() + with pytest.raises(WorkflowEnvironmentConfigurationError): + _ = await workflow_block_instance.run(a=3, b=5) diff --git a/tests/workflows/unit_tests/execution_engine/introspection/test_blocks_loader.py b/tests/workflows/unit_tests/execution_engine/introspection/test_blocks_loader.py index 662e2a9fc4..68dd7202c7 100644 --- a/tests/workflows/unit_tests/execution_engine/introspection/test_blocks_loader.py +++ b/tests/workflows/unit_tests/execution_engine/introspection/test_blocks_loader.py @@ -174,7 +174,7 @@ def test_load_initializers_when_plugin_exists_and_initializers_provided() -> Non result = load_initializers() # then - assert len(result) == 2 + assert len(result) == 5 assert ( result[ "tests.workflows.unit_tests.execution_engine.introspection.plugin_with_initializers.a" @@ -199,7 +199,7 @@ def test_describe_available_blocks_when_valid_plugins_are_loaded( ) # when - result = describe_available_blocks() + result = describe_available_blocks(dynamic_blocks=[]) # then assert len(result.blocks) == 2, "Expected 2 blocks to be loaded" @@ -207,7 +207,7 @@ def test_describe_available_blocks_when_valid_plugins_are_loaded( assert result.blocks[0].manifest_class == plugin_with_valid_blocks.Block1Manifest assert result.blocks[1].block_class == plugin_with_valid_blocks.Block2 assert result.blocks[1].manifest_class == plugin_with_valid_blocks.Block2Manifest - assert len(result.declared_kinds) == 3 + assert len(result.declared_kinds) == 33 @mock.patch.object(blocks_loader, "load_workflow_blocks") @@ -224,7 +224,7 @@ def test_describe_available_blocks_when_plugins_duplicate_class_names( # when with pytest.raises(PluginLoadingError): - _ = describe_available_blocks() + _ = describe_available_blocks(dynamic_blocks=[]) @mock.patch.object(blocks_loader, "load_workflow_blocks") @@ -238,4 +238,4 @@ def test_describe_available_blocks_when_plugins_duplicate_type_identifiers( # when with pytest.raises(PluginLoadingError): - _ = describe_available_blocks() + _ = describe_available_blocks(dynamic_blocks=[])