Skip to content

Commit

Permalink
Merge pull request #585 from roboflow/fix/bug_with_order_of_execution
Browse files Browse the repository at this point in the history
Fix bug with execution order in workflows
  • Loading branch information
PawelPeczek-Roboflow authored Aug 16, 2024
2 parents 3ec1a52 + 6981108 commit 088eef6
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 24 deletions.
2 changes: 1 addition & 1 deletion inference/core/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.16.1"
__version__ = "0.16.2"


if __name__ == "__main__":
Expand Down
11 changes: 5 additions & 6 deletions inference/core/workflows/core_steps/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import concurrent
import logging
import uuid
from concurrent.futures import ThreadPoolExecutor
Expand Down Expand Up @@ -416,9 +415,9 @@ def remove_unexpected_keys_from_dictionary(


def run_in_parallel(tasks: List[Callable[[], T]], max_workers: int = 1) -> List[T]:
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(task) for task in tasks]
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
return results
return list(executor.map(_run, tasks))


def _run(fun: Callable[[], T]) -> T:
return fun()
10 changes: 5 additions & 5 deletions inference/core/workflows/execution_engine/v1/executor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
def run_steps_in_parallel(
steps: List[Callable[[], T]], max_workers: int = 1
) -> List[T]:
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(step) for step in steps]
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
return results
return list(executor.map(_run, steps))


def _run(fun: Callable[[], T]) -> T:
return fun()
13 changes: 2 additions & 11 deletions inference_sdk/http/utils/executors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import concurrent
import logging
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
Expand Down Expand Up @@ -55,17 +54,9 @@ def make_parallel_requests(
request_method: RequestMethod,
) -> List[Response]:
workers = len(requests_data)
results = []
make_request_closure = partial(make_request, request_method=request_method)
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = [
executor.submit(
make_request, request_data=data, request_method=request_method
)
for data in requests_data
]
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
return results
return list(executor.map(make_request_closure, requests_data))


@backoff.on_predicate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def test_make_parallel_requests(
# then
assert len(result) == 4, "Number of output responses must match number of requests"
make_request_mock.assert_has_calls(
[call(request_data=request_data, request_method=RequestMethod.GET)] * 4, any_order=True
[call(request_data, request_method=RequestMethod.GET)] * 4, any_order=True
), "Mock of request method must be invoked 4 times with proper parameters"


Expand Down

0 comments on commit 088eef6

Please sign in to comment.