From 6981108d866be40952fd32257da693cd8457e299 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Fri, 16 Aug 2024 19:12:20 +0200 Subject: [PATCH] Fix bug with execution order in workflows --- inference/core/version.py | 2 +- inference/core/workflows/core_steps/common/utils.py | 11 +++++------ .../workflows/execution_engine/v1/executor/utils.py | 10 +++++----- inference_sdk/http/utils/executors.py | 13 ++----------- .../unit_tests/http/utils/test_executors.py | 2 +- 5 files changed, 14 insertions(+), 24 deletions(-) diff --git a/inference/core/version.py b/inference/core/version.py index 7e1fbb556d..8b9c9d1e0a 100644 --- a/inference/core/version.py +++ b/inference/core/version.py @@ -1,4 +1,4 @@ -__version__ = "0.16.1" +__version__ = "0.16.2" if __name__ == "__main__": diff --git a/inference/core/workflows/core_steps/common/utils.py b/inference/core/workflows/core_steps/common/utils.py index ad81d07e06..138afe9c04 100644 --- a/inference/core/workflows/core_steps/common/utils.py +++ b/inference/core/workflows/core_steps/common/utils.py @@ -1,4 +1,3 @@ -import concurrent import logging import uuid from concurrent.futures import ThreadPoolExecutor @@ -416,9 +415,9 @@ def remove_unexpected_keys_from_dictionary( def run_in_parallel(tasks: List[Callable[[], T]], max_workers: int = 1) -> List[T]: - results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [executor.submit(task) for task in tasks] - for future in concurrent.futures.as_completed(futures): - results.append(future.result()) - return results + return list(executor.map(_run, tasks)) + + +def _run(fun: Callable[[], T]) -> T: + return fun() diff --git a/inference/core/workflows/execution_engine/v1/executor/utils.py b/inference/core/workflows/execution_engine/v1/executor/utils.py index 20f5e9132d..2bb35e78f5 100644 --- a/inference/core/workflows/execution_engine/v1/executor/utils.py +++ b/inference/core/workflows/execution_engine/v1/executor/utils.py @@ -8,9 +8,9 @@ def run_steps_in_parallel( steps: List[Callable[[], T]], max_workers: int = 1 ) -> List[T]: - results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [executor.submit(step) for step in steps] - for future in concurrent.futures.as_completed(futures): - results.append(future.result()) - return results + return list(executor.map(_run, steps)) + + +def _run(fun: Callable[[], T]) -> T: + return fun() diff --git a/inference_sdk/http/utils/executors.py b/inference_sdk/http/utils/executors.py index 02fae8d4cf..1ea0faf95e 100644 --- a/inference_sdk/http/utils/executors.py +++ b/inference_sdk/http/utils/executors.py @@ -1,5 +1,4 @@ import asyncio -import concurrent import logging from concurrent.futures import ThreadPoolExecutor from enum import Enum @@ -55,17 +54,9 @@ def make_parallel_requests( request_method: RequestMethod, ) -> List[Response]: workers = len(requests_data) - results = [] + make_request_closure = partial(make_request, request_method=request_method) with ThreadPoolExecutor(max_workers=workers) as executor: - futures = [ - executor.submit( - make_request, request_data=data, request_method=request_method - ) - for data in requests_data - ] - for future in concurrent.futures.as_completed(futures): - results.append(future.result()) - return results + return list(executor.map(make_request_closure, requests_data)) @backoff.on_predicate( diff --git a/tests/inference_sdk/unit_tests/http/utils/test_executors.py b/tests/inference_sdk/unit_tests/http/utils/test_executors.py index b3b92fc990..136b728d4d 100644 --- a/tests/inference_sdk/unit_tests/http/utils/test_executors.py +++ b/tests/inference_sdk/unit_tests/http/utils/test_executors.py @@ -242,7 +242,7 @@ def test_make_parallel_requests( # then assert len(result) == 4, "Number of output responses must match number of requests" make_request_mock.assert_has_calls( - [call(request_data=request_data, request_method=RequestMethod.GET)] * 4, any_order=True + [call(request_data, request_method=RequestMethod.GET)] * 4, any_order=True ), "Mock of request method must be invoked 4 times with proper parameters"