Skip to content

Commit

Permalink
Merge pull request #536 from roboflow/add-workflow-benchmark
Browse files Browse the repository at this point in the history
Add workflow benchmark
  • Loading branch information
grzegorz-roboflow authored Jul 18, 2024
2 parents a3b3d50 + 7073d67 commit 2be4fb5
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 27 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/load_test_hosted_inference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ on:
- object-detection
- instance-segmentation
- classification
- workflows

jobs:
build:
Expand Down Expand Up @@ -49,6 +50,10 @@ jobs:
if: ${{ github.event.inputs.environment == 'production' && github.event.inputs.model_type == 'classification' }}
run: |
ROBOFLOW_API_KEY=${{ secrets.LOAD_TEST_PRODUCTION_API_KEY }} python -m inference_cli.main benchmark api-speed -m vehicle-classification-eapcd/2 -d coco -rps 5 -br 500 -h https://classify.roboflow.com --yes --output_location test_results.json
- name: 🏋️‍♂️ Load test 🚨 PRODUCTION 🚨 | workflows 🔥🔥🔥🔥
if: ${{ github.event.inputs.environment == 'production' && github.event.inputs.model_type == 'workflows' }}
run: |
ROBOFLOW_API_KEY=${{ secrets.LOAD_TEST_PRODUCTION_API_KEY }} python -m inference_cli.main benchmark api-speed -wid workflows-production-test -wn paul-guerrie-tang1 -d coco -rps 5 -br 500 -h https://classify.roboflow.com --yes --output_location test_results.json
- name: 🏋️‍♂️ Load test 😎 STAGING 😎 | object-detection 🔥🔥🔥🔥
if: ${{ github.event.inputs.environment == 'staging' && github.event.inputs.model_type == 'object-detection' }}
Expand All @@ -62,5 +67,9 @@ jobs:
if: ${{ github.event.inputs.environment == 'staging' && github.event.inputs.model_type == 'classification' }}
run: |
ROBOFLOW_API_KEY=${{ secrets.LOAD_TEST_STAGING_API_KEY }} python -m inference_cli.main benchmark api-speed -m catdog/28 -d coco -rps 5 -br 500 -h https://lambda-classification.staging.roboflow.com --legacy-endpoints --yes --output_location test_results.json
- name: 🏋️‍♂️ Load test 😎 STAGING 😎 | workflows 🔥🔥🔥🔥
if: ${{ github.event.inputs.environment == 'staging' && github.event.inputs.model_type == 'workflows' }}
run: |
ROBOFLOW_API_KEY=${{ secrets.LOAD_TEST_STAGING_API_KEY }} python -m inference_cli.main benchmark api-speed -wid workflows-staging-test -wn paul-guerrie -d coco -rps 5 -br 500 -h https://lambda-classification.staging.roboflow.com --legacy-endpoints --yes --output_location test_results.json
- name: 📈 RESULTS
run: cat test_results.json | jq
90 changes: 73 additions & 17 deletions inference_cli/benchmark.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import json
from typing import Optional

import typer
from typing_extensions import Annotated

from inference_cli.lib.benchmark.dataset import PREDEFINED_DATASETS
from inference_cli.lib.benchmark_adapter import (
run_api_speed_benchmark,
run_infer_api_speed_benchmark,
run_python_package_speed_benchmark,
run_workflow_api_speed_benchmark,
)

benchmark_app = typer.Typer(help="Commands for running inference benchmarks.")
Expand All @@ -15,13 +17,45 @@
@benchmark_app.command()
def api_speed(
model_id: Annotated[
str,
Optional[str],
typer.Option(
"--model_id",
"-m",
help="Model ID in format project/version.",
),
],
] = None,
workflow_id: Annotated[
Optional[str],
typer.Option(
"--workflow-id",
"-wid",
help="Workflow ID.",
),
] = None,
workspace_name: Annotated[
Optional[str],
typer.Option(
"--workspace-name",
"-wn",
help="Workspace Name.",
),
] = None,
workflow_specification: Annotated[
Optional[str],
typer.Option(
"--workflow-specification",
"-ws",
help="Workflow specification.",
),
] = None,
workflow_parameters: Annotated[
Optional[str],
typer.Option(
"--workflow-parameters",
"-wp",
help="Model ID in format project/version.",
),
] = None,
dataset_reference: Annotated[
str,
typer.Option(
Expand Down Expand Up @@ -110,20 +144,42 @@ def api_speed(
if proceed.lower() != "y":
return None
try:
run_api_speed_benchmark(
model_id=model_id,
dataset_reference=dataset_reference,
host=host,
warm_up_requests=warm_up_requests,
benchmark_requests=benchmark_requests,
request_batch_size=request_batch_size,
number_of_clients=number_of_clients,
requests_per_second=requests_per_second,
api_key=api_key,
model_configuration=model_configuration,
output_location=output_location,
enforce_legacy_endpoints=enforce_legacy_endpoints,
)
if model_id:
run_infer_api_speed_benchmark(
model_id=model_id,
dataset_reference=dataset_reference,
host=host,
warm_up_requests=warm_up_requests,
benchmark_requests=benchmark_requests,
request_batch_size=request_batch_size,
number_of_clients=number_of_clients,
requests_per_second=requests_per_second,
api_key=api_key,
model_configuration=model_configuration,
output_location=output_location,
enforce_legacy_endpoints=enforce_legacy_endpoints,
)
else:
if workflow_specification:
workflow_specification = json.loads(workflow_specification)
if workflow_parameters:
workflow_parameters = json.loads(workflow_parameters)
run_workflow_api_speed_benchmark(
workflow_id=workflow_id,
workspace_name=workspace_name,
workflow_specification=workflow_specification,
workflow_parameters=workflow_parameters,
dataset_reference=dataset_reference,
host=host,
warm_up_requests=warm_up_requests,
benchmark_requests=benchmark_requests,
request_batch_size=request_batch_size,
number_of_clients=number_of_clients,
requests_per_second=requests_per_second,
api_key=api_key,
model_configuration=model_configuration,
output_location=output_location,
)
except Exception as error:
typer.echo(f"Command failed. Cause: {error}")
raise typer.Exit(code=1)
Expand Down
151 changes: 145 additions & 6 deletions inference_cli/lib/benchmark/api_speed.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
from functools import partial
from threading import Thread
from typing import Callable, List, Optional
from typing import Any, Callable, Dict, List, Optional

import numpy as np
import requests
Expand All @@ -28,7 +28,7 @@ def run_api_warm_up(
_ = client.infer(inference_input=image)


def coordinate_api_speed_benchmark(
def coordinate_infer_api_speed_benchmark(
client: InferenceHTTPClient,
images: List[np.ndarray],
model_id: str,
Expand All @@ -52,7 +52,7 @@ def coordinate_api_speed_benchmark(
target=display_benchmark_statistics, args=(results_collector,)
)
statistics_display_thread.start()
execute_api_speed_benchmark(
execute_infer_api_speed_benchmark(
results_collector=results_collector,
client=client,
images=images,
Expand All @@ -66,7 +66,96 @@ def coordinate_api_speed_benchmark(
return statistics


def execute_api_speed_benchmark(
def coordinate_workflow_api_speed_benchmark(
client: InferenceHTTPClient,
images: List[np.ndarray],
workspace_name: Optional[str],
workflow_id: Optional[str],
workflow_specification: Optional[str],
workflow_parameters: Optional[Dict[str, Any]],
benchmark_requests: int,
request_batch_size: int,
number_of_clients: int,
requests_per_second: Optional[int],
) -> InferenceStatistics:
image_sizes = {i.shape[:2] for i in images}
print(f"Detected images dimensions: {image_sizes}")
results_collector = ResultsCollector()
statistics_display_thread = Thread(
target=display_benchmark_statistics, args=(results_collector,)
)
statistics_display_thread.start()
execute_workflow_api_speed_benchmark(
workspace_name=workspace_name,
workflow_id=workflow_id,
workflow_specification=workflow_specification,
workflow_parameters=workflow_parameters,
results_collector=results_collector,
client=client,
images=images,
benchmark_requests=benchmark_requests,
request_batch_size=request_batch_size,
number_of_clients=number_of_clients,
requests_per_second=requests_per_second,
)
statistics = results_collector.get_statistics()
statistics_display_thread.join()
return statistics


def execute_infer_api_speed_benchmark(
results_collector: ResultsCollector,
client: InferenceHTTPClient,
images: List[np.ndarray],
benchmark_requests: int,
request_batch_size: int,
number_of_clients: int,
requests_per_second: Optional[int],
) -> None:
while len(images) < request_batch_size:
images = images + images
api_request_executor = partial(
execute_infer_api_request,
results_collector=results_collector,
client=client,
images=images,
request_batch_size=request_batch_size,
delay=requests_per_second is not None,
)
if requests_per_second is not None:
if number_of_clients is not None:
print(
"Parameter specifying `number_of_clients` is ignored when number of "
"RPS to maintain is specified."
)
results_collector.start_benchmark()
execute_given_rps_sequentially(
executor=api_request_executor,
benchmark_requests=benchmark_requests,
requests_per_second=requests_per_second,
)
results_collector.stop_benchmark()
return None
client_threads = []
results_collector.start_benchmark()
for _ in range(number_of_clients):
client_thread = Thread(
target=execute_requests_sequentially,
args=(api_request_executor, benchmark_requests),
)
client_thread.start()
client_threads.append(client_thread)
for thread in client_threads:
thread.join()
results_collector.stop_benchmark()
return None


def execute_workflow_api_speed_benchmark(
workspace_name: Optional[str],
workflow_id: Optional[str],
workflow_specification: Optional[str],
workflow_parameters: Optional[Dict[str, Any]],
results_collector: ResultsCollector,
client: InferenceHTTPClient,
images: List[np.ndarray],
Expand All @@ -78,7 +167,11 @@ def execute_api_speed_benchmark(
while len(images) < request_batch_size:
images = images + images
api_request_executor = partial(
execute_api_request,
execute_workflow_api_request,
workspace_name=workspace_name,
workflow_id=workflow_id,
workflow_specification=workflow_specification,
workflow_parameters=workflow_parameters,
results_collector=results_collector,
client=client,
images=images,
Expand Down Expand Up @@ -147,7 +240,7 @@ def execute_given_rps_sequentially(
thread.join()


def execute_api_request(
def execute_infer_api_request(
results_collector: ResultsCollector,
client: InferenceHTTPClient,
images: List[np.ndarray],
Expand Down Expand Up @@ -179,6 +272,52 @@ def execute_api_request(
)


def execute_workflow_api_request(
workspace_name: Optional[str],
workflow_id: Optional[str],
workflow_specification: Optional[str],
workflow_parameters: Optional[Dict[str, Any]],
results_collector: ResultsCollector,
client: InferenceHTTPClient,
images: List[np.ndarray],
request_batch_size: int,
delay: bool = False,
) -> None:
if delay:
time.sleep(random.random())
random.shuffle(images)
images = {f"image": images[:request_batch_size]}
start = time.time()
try:
kwargs = {
"images": images,
}
if workflow_parameters:
kwargs["parameters"] = workflow_parameters
if workspace_name and workflow_id:
kwargs["workspace_name"] = workspace_name
kwargs["workflow_id"] = workflow_id
else:
kwargs["specification"] = workflow_specification
_ = client.run_workflow(**kwargs)
duration = time.time() - start
results_collector.register_inference_duration(
batch_size=request_batch_size, duration=duration
)
except Exception as exc:
duration = time.time() - start
results_collector.register_inference_duration(
batch_size=request_batch_size, duration=duration
)
status_code = exc.__class__.__name__
if isinstance(exc, requests.exceptions.HTTPError):
status_code = str(exc.response.status_code)

results_collector.register_error(
batch_size=request_batch_size, status_code=status_code
)


def display_benchmark_statistics(
results_collector: ResultsCollector,
sleep_time: float = 5.0,
Expand Down
Loading

0 comments on commit 2be4fb5

Please sign in to comment.