Skip to content

Commit

Permalink
updated
Browse files Browse the repository at this point in the history
  • Loading branch information
robertgshaw2-redhat committed Dec 27, 2024
1 parent 32784a7 commit 5d40bad
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 56 deletions.
2 changes: 1 addition & 1 deletion vllm/engine/multiprocessing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__(

@dataclass
class RPCError:
request_id: Optional[str]
request_id: str
exception: BaseException


Expand Down
73 changes: 24 additions & 49 deletions vllm/engine/multiprocessing/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,60 +155,35 @@ async def run_output_handler_loop(self):
return

message: Frame = await self.output_socket.recv(copy=False)
request_outputs = pickle.loads(message.buffer)

is_error = isinstance(request_outputs,
(BaseException, RPCError))
if is_error:
if isinstance(request_outputs, RPCError):
rpc_error: RPCError = request_outputs
request_id = rpc_error.request_id
exception = rpc_error.exception
is_engine_errored = rpc_error.is_engine_errored
else:
# MPLLMEngine should always return an RPCError to
# the output_socket when an issue arises.
# If we are here, we are in a bad state and
# should shut down the server.
error: BaseException = request_outputs
logger.error(
"Received Exception %s rather than RPCError from "
"MPLLMEngine. This should never happen.", error)
request_id = None
exception = error
is_engine_errored = True

# Set to error state only on engine critical error
# (and record only the first one)
if is_engine_errored and not self._errored_with:
self._errored_with = exception
# If engine is errored, no matter the type of exception
# it will no longer be able to receive new requests,
# therefore we have to inform that the current
# processed requests failed as well. Send back a dead
# engine error give this feedback and also give a
# 'hint' to the server to shutdown next.
exception = self.dead_error

if request_id is None:
# If request_id is None, then the engine raised an
# exception for a batch, and we may not know the
# request that caused it, neither if it was actually
# caused by any of them (e.g. CUDA OOM). Therefore we
# broadcast the same exception for all requests.
for queue_i in tuple(self.output_queues.values()):
queue_i.put_nowait(exception)
else:
queue = self.output_queues.get(request_id)
if queue is not None:
queue.put_nowait(exception)
else:
output = pickle.loads(message.buffer)

# Occurs if there is an error in adding a new request.
# Note: the server can keep running if this happens,
# it only impacts a specific request.
if isinstance(output, RPCError):

rpc_error: RPCError = output

# Put in the queue so it can be raised in generate().
queue = self.output_queues.get(
rpc_error.request_id, None)
if queue is not None:
queue.put_nowait(rpc_error.exception)

# One request output for each item in the batch.
elif isinstance(output, List):
request_outputs: List[RequestOutput] = output

# Put each output into the appropriate steam.
for request_output in request_outputs:
queue = self.output_queues.get(
request_output.request_id)
request_output.request_id, None)
if queue is not None:
queue.put_nowait(request_output)

else:
self._set_errored(ValueError(
f"Unknown output in handler: {output}"))

except asyncio.CancelledError:
logger.debug("Shutting down MQLLMEngineClient output handler.")
Expand Down
9 changes: 6 additions & 3 deletions vllm/engine/multiprocessing/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def shutdown(self):

@classmethod
def from_engine_args(
cls,
cls,
engine_args: AsyncEngineArgs,
usage_context: UsageContext,
input_path: str,
Expand Down Expand Up @@ -247,8 +247,11 @@ def run_mq_llm_engine(engine_args: AsyncEngineArgs,
try:
engine = MQLLMEngine.from_engine_args(engine_args, usage_context,
input_path, output_path)
assert engine is not None # mypy
# Send Readiness signal to EngineClient.
tracing_data = {"is_tracing_enabled": engine.engine.is_tracing_enabled()}
tracing_data = {
"is_tracing_enabled": engine.engine.is_tracing_enabled()
}
ready_pipe.send({"status": "READY", "data": tracing_data})
engine.run_engine_loop()

Expand All @@ -261,7 +264,7 @@ def run_mq_llm_engine(engine_args: AsyncEngineArgs,
# stack trace for both startup time and runtime error.
except Exception:
traceback = get_exception_traceback()
logger.error("EngineCore hit an exception: %s", traceback)
logger.error("MQLLMEngine hit an exception: %s", traceback)
parent_process.send_signal(signal.SIGQUIT)

finally:
Expand Down
6 changes: 3 additions & 3 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,11 @@ async def build_async_engine_client_from_engine_args(

mq_engine_client: Optional[MQLLMEngineClient] = None
try:
mq_engine_client = await asyncio.get_running_loop().run_in_executor(
None, build_mq_engine)
mq_engine_client = await asyncio.get_running_loop(
).run_in_executor(None, build_mq_engine)

yield mq_engine_client # type: ignore[misc]

finally:
# Shutdown background process + connections to backend.
if mq_engine_client:
Expand Down

0 comments on commit 5d40bad

Please sign in to comment.