diff --git a/pyproject.toml b/pyproject.toml index 9d97c5d..2627f24 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -95,7 +95,6 @@ lint.select = ["E", "F", "W"] [tool.pytest.ini_options] addopts = '-s -vvv --cache-clear' -asyncio_mode = 'auto' markers = [ "smoke: quick tests to check basic functionality", "sanity: detailed tests to ensure major functions work correctly", diff --git a/src/guidellm/backend/base.py b/src/guidellm/backend/base.py index 644d28c..5e135ea 100644 --- a/src/guidellm/backend/base.py +++ b/src/guidellm/backend/base.py @@ -89,7 +89,9 @@ def submit(self, request: TextGenerationRequest) -> TextGenerationResult: :rtype: TextGenerationResult """ - logger.info(f"Submitting request with prompt: {request.prompt}") + logger.info( + f"Submitting request with prompython -m ruff format src testspt: {request.prompt}" + ) result = TextGenerationResult(TextGenerationRequest(prompt=request.prompt)) result.start(request.prompt) diff --git a/src/guidellm/core/result.py b/src/guidellm/core/result.py index 77477ea..1126a96 100644 --- a/src/guidellm/core/result.py +++ b/src/guidellm/core/result.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from time import perf_counter, time -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Type, Union from loguru import logger @@ -270,7 +270,11 @@ class TextGenerationError: :type error: Exception """ - def __init__(self, request: TextGenerationRequest, error: Exception): + def __init__( + self, + request: TextGenerationRequest, + error_class: Type[BaseException], + ): """ Initialize the TextGenerationError with a unique identifier. @@ -279,10 +283,10 @@ def __init__(self, request: TextGenerationRequest, error: Exception): :param error: The exception that occurred during the text generation. :type error: Exception """ - self._request = request - self._error = error + self._request: TextGenerationRequest = request + self._error_class: Type[BaseException] = error_class - logger.error(f"Error occurred for request: {self._request}: {error}") + logger.error(f"Error occurred for request: {self._request}: {error_class}") def __repr__(self) -> str: """ @@ -291,7 +295,9 @@ def __repr__(self) -> str: :return: String representation of the TextGenerationError. :rtype: str """ - return f"TextGenerationError(request={self._request}, error={self._error})" + return ( + f"TextGenerationError(request={self._request}, error={self._error_class})" + ) @property def request(self) -> TextGenerationRequest: @@ -304,14 +310,14 @@ def request(self) -> TextGenerationRequest: return self._request @property - def error(self) -> Exception: + def error(self) -> Type[BaseException]: """ Get the exception that occurred during the text generation. :return: The exception. :rtype: Exception """ - return self._error + return self._error_class @dataclass diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 907c6e1..37f2aca 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -1,6 +1,8 @@ import asyncio import time -from typing import Generator, List, Optional +from typing import Generator, List, Optional, Tuple + +from loguru import logger from guidellm.backend import Backend from guidellm.core import TextGenerationBenchmark, TextGenerationError @@ -36,6 +38,28 @@ def __init__( self._max_requests = max_requests self._max_duration = max_duration + # Tasks that scheduler is going to manage. + # NOTE: Tasks are populated in sync/async manner and limited by + # the max number of requests and max duration on the execution. + self._tasks: List[Tuple[asyncio.Task, Task]] = [] + + def __len__(self) -> int: + """ + The length of the scheduler + is the number of total tasks in the processing at the moment. + """ + + return len(self._tasks) + + @property + def event_loop(self) -> asyncio.AbstractEventLoop: + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return asyncio.get_event_loop() + else: + return loop + def run(self) -> TextGenerationBenchmark: if self._load_gen_mode == LoadGenerationMode.SYNCHRONOUS: report = self._run_sync() @@ -67,6 +91,16 @@ def _run_sync(self) -> TextGenerationBenchmark: return benchmark async def _run_async(self) -> TextGenerationBenchmark: + """ + Running in async mode determines next steps: + * Iterate through all the tasks with load attached + * Check the execution time does not go over the max duration + * Check the number of requests is not greater than max requests + + If the max duration is not specified for the scheduler - check only + max requests and just break the loop without cancelling tasks. + """ + benchmark: TextGenerationBenchmark = TextGenerationBenchmark( mode=self._load_gen_mode.value, rate=self._load_gen_rate ) @@ -75,16 +109,18 @@ async def _run_async(self) -> TextGenerationBenchmark: load_gen = LoadGenerator(self._load_gen_mode, self._load_gen_rate) start_time: float = time.time() - requests_counter = 0 - tasks: List[asyncio.Task] = [] + requests_counter: int = 0 - for _task, task_start_time in zip(self._task_iterator(), load_gen.times()): + for task, task_start_time in zip(self._task_iterator(), load_gen.times()): if ( - self._max_requests is not None - and requests_counter >= self._max_requests - ) or ( self._max_duration is not None and time.time() - start_time >= self._max_duration + ): + self.cancel_running_tasks(benchmark) + break + elif ( + self._max_requests is not None + and requests_counter >= self._max_requests ): break @@ -93,32 +129,52 @@ async def _run_async(self) -> TextGenerationBenchmark: if pending_time > 0: await asyncio.sleep(pending_time) - tasks.append( - asyncio.create_task(self._run_task_async(_task, benchmark)), + self._tasks.append( + (asyncio.create_task(self._run_task_async(task, benchmark)), task) ) + requests_counter += 1 # Tasks execution strategy dispatcher if self._max_duration is None: - # Ensure all the asyncio tasks are done - await asyncio.gather(*tasks) - return benchmark + await asyncio.gather( + *(asyncio_task for asyncio_task, _ in self._tasks), + return_exceptions=False, + ) else: try: - # Wait for tasks execution if the self.max_duration is specified - await asyncio.wait_for(asyncio.gather(*tasks), self._max_duration) - except asyncio.TimeoutError: - breakpoint() # TODO: remove - # Return not fully filled benchmark if Task TTL is end - for task in tasks: - if not task.done(): - task.cancel() - finally: - return benchmark + # Set the timeout if the max duration is specified + await asyncio.wait_for( + asyncio.gather( + *(asyncio_task for asyncio_task, _ in self._tasks), + return_exceptions=True, + ), + self._max_duration, + ) + except TimeoutError: + self.cancel_running_tasks(benchmark) + + return benchmark + + def cancel_running_tasks(self, benchmark: TextGenerationBenchmark) -> None: + """ + Cancel all the running tasks for the scheduler + """ + + for asyncio_task, guidellm_task in self._tasks: + if not asyncio_task.done(): + logger.debug(f"Cancelling running task {asyncio_task}") + asyncio_task.cancel() + benchmark.errors.append( + TextGenerationError( + **guidellm_task._params, + error_class=asyncio.CancelledError, + ) + ) async def _run_task_async(self, task: Task, benchmark: TextGenerationBenchmark): benchmark.request_started() - res = await task.run_async() + res = await task.run_async(self.event_loop) benchmark.request_completed(res) def _task_iterator(self) -> Generator[Task, None, None]: diff --git a/src/guidellm/scheduler/task.py b/src/guidellm/scheduler/task.py index 473fafe..6dc350c 100644 --- a/src/guidellm/scheduler/task.py +++ b/src/guidellm/scheduler/task.py @@ -36,39 +36,26 @@ def __init__( f"params: {self._params}" ) - async def run_async(self) -> Any: + async def run_async(self, event_loop: asyncio.AbstractEventLoop) -> Any: """ Run the task asynchronously. :return: The output of the function. :rtype: Any """ + logger.info(f"Running task asynchronously with function: {self._func.__name__}") - try: - loop = asyncio.get_running_loop() - result = await asyncio.gather( - loop.run_in_executor( - None, functools.partial(self._func, **self._params) - ), - return_exceptions=True, + try: + result = await event_loop.run_in_executor( + None, functools.partial(self._func, **self._params) ) - if isinstance(result[0], Exception): - raise result[0] + if isinstance(result, Exception): + raise result - if self.cancelled is True: - raise asyncio.CancelledError("Task was cancelled") - - logger.info(f"Task completed with result: {result[0]}") + logger.info(f"Task completed with result: {result}") - return result[0] - except asyncio.CancelledError as cancel_err: - logger.warning("Task was cancelled") - return ( - cancel_err - if not self._err_container - else self._err_container(**self._params, error=cancel_err) - ) + return result except Exception as err: logger.error(f"Task failed with error: {err}") return ( @@ -96,26 +83,3 @@ def run_sync(self) -> Any: if not self._err_container else self._err_container(**self._params, error=err) ) - - def cancel(self) -> None: - """ - Cancel the task. - """ - logger.info("Cancelling task") - self._cancel_event.set() - - async def _check_cancelled(self): - """ - Check if the task is cancelled. - """ - await self._cancel_event.wait() - - @property - def cancelled(self) -> bool: - """ - Check if the task is cancelled. - - :return: True if the task is cancelled, False otherwise. - :rtype: bool - """ - return self._cancel_event.is_set() diff --git a/tests/unit/backend/test_openai_backend.py b/tests/unit/backend/test_openai_backend.py index 442d1bc..f4e55f2 100644 --- a/tests/unit/backend/test_openai_backend.py +++ b/tests/unit/backend/test_openai_backend.py @@ -79,7 +79,6 @@ def test_make_request(openai_backend_factory, openai_completion_create_patch): backend_service.make_request(request=request), openai_completion_create_patch, ): - total_generative_responses += 1 expected_token: Optional[str] = getattr(completion_patch, "content") or None diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 834e616..8449426 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -21,22 +21,6 @@ def openai_completion_create_patch( return cast(openai.Stream[openai.types.Completion], items) -@pytest.fixture(autouse=True) -def openai_async_completion_create_patch( - mocker, -): - """ - Mock available models function to avoid OpenAI API call. - """ - - async def callback(*args, **kwargs): - return dummy.data.openai_completion_factory_async() - - return mocker.patch( - "openai.resources.completions.AsyncCompletions.create", side_effect=callback - ) - - @pytest.fixture(autouse=True) def openai_models_list_patch(mocker) -> List[openai.types.Model]: """ diff --git a/tests/unit/executor/test_report_generation.py b/tests/unit/executor/test_report_generation.py index 43dde54..16c90c1 100644 --- a/tests/unit/executor/test_report_generation.py +++ b/tests/unit/executor/test_report_generation.py @@ -45,6 +45,9 @@ def test_executor_openai_single_report_generation_constant_mode_infinite( ): """ Test without max duration defined. + + Does not matter how many requests is specified, + the execution DOES NOT have any duration limitations. """ request_genrator = dummy.services.TestRequestGenerator( @@ -62,14 +65,15 @@ def test_executor_openai_single_report_generation_constant_mode_infinite( profile_mode=profile_generation_mode, profile_args=profile_generator_kwargs, max_requests=2, - max_duration=None, + max_duration=None, # not specified for no limitations ) report: TextGenerationBenchmarkReport = executor.run() assert isinstance(executor.backend, OpenAIBackend) assert len(report.benchmarks) == 1 - assert len(report.benchmarks[0].results) == 2 + assert len(report.benchmarks[0].results) == len(executor.scheduler) + assert len(report.benchmarks[0].errors) == 0 @pytest.mark.sanity @@ -106,11 +110,12 @@ def test_executor_openai_single_report_generation_constant_mode_limited( @pytest.mark.sanity -def test_executor_openai_single_report_generation_constant_mode_cancelled( +def test_executor_openai_single_report_generation_constant_mode_failed( openai_backend_factory, ): """ - Test max duration immediate cancellation. + Test max duration immediate tasks iteration break up + because of the `time.time() - start_time >= self._max_duration`. """ request_genrator = dummy.services.TestRequestGenerator( @@ -128,16 +133,17 @@ def test_executor_openai_single_report_generation_constant_mode_cancelled( profile_mode=profile_generation_mode, profile_args=profile_generator_kwargs, max_requests=10, - max_duration=0.1, # immediately stop the execution + max_duration=0, # immediately stop the execution ) report: TextGenerationBenchmarkReport = executor.run() assert isinstance(executor.backend, OpenAIBackend) assert len(report.benchmarks) == 1 - assert report.benchmarks[0].results + assert report.benchmarks[0].results == [] +@pytest.mark.sanity def test_executor_openai_single_report_generation_constant_mode_cancelled_reports( openai_backend_factory, ): @@ -155,12 +161,14 @@ def test_executor_openai_single_report_generation_constant_mode_cancelled_report request_generator=request_genrator, profile_mode=profile_generation_mode, profile_args=profile_generator_kwargs, - max_requests=10, - max_duration=5, # expected 6 tasks to be started for that condition + max_requests=5, + max_duration=3, ) report: TextGenerationBenchmarkReport = executor.run() assert isinstance(executor.backend, OpenAIBackend) assert len(report.benchmarks) == 1 - assert len(report.benchmarks[0].results) == 6 + assert len(report.benchmarks[0].results) + len(report.benchmarks[0].errors) == len( + executor.scheduler + )