Skip to content

Commit

Permalink
make KeyboardInterrupt less painful
Browse files Browse the repository at this point in the history
  • Loading branch information
robertgshaw2-redhat committed Dec 27, 2024
1 parent afa8de2 commit 32784a7
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 24 deletions.
14 changes: 10 additions & 4 deletions vllm/engine/multiprocessing/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,19 @@ async def run_output_handler_loop(self):
except asyncio.CancelledError:
logger.debug("Shutting down MQLLMEngineClient output handler.")

def close(self):
"""Destroy the ZeroMQ Context."""
def shutdown(self):
"""Destroy the MQLLMEngine."""

# Shutdown the background process.
if hasattr(self, "engine_proc_handler"):
self.engine_proc_handler.shutdown()

# Close all sockets and terminate the context.
self.ctx.destroy(linger=0)
if hasattr(self, "ctx"):
self.ctx.destroy(linger=0)

# Cancel background tasks.
if self.output_loop is not None:
if hasattr(self, "output_loop") and self.output_loop:
self.output_loop.cancel()

def _set_errored(self, e: BaseException):
Expand Down
25 changes: 15 additions & 10 deletions vllm/engine/multiprocessing/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,13 @@ def shutdown(self):
del self.engine

@classmethod
def from_engine_args(cls, engine_args: AsyncEngineArgs,
usage_context: UsageContext, ipc_path: str):
def from_engine_args(
cls,
engine_args: AsyncEngineArgs,
usage_context: UsageContext,
input_path: str,
output_path: str,
):
"""Creates an MQLLMEngine from the engine arguments."""
# Setup plugins for each process
from vllm.plugins import load_general_plugins
Expand All @@ -107,7 +112,8 @@ def from_engine_args(cls, engine_args: AsyncEngineArgs,

use_async_sockets = engine_config.model_config.use_async_output_proc

return cls(ipc_path=ipc_path,
return cls(input_path=input_path,
output_path=output_path,
use_async_sockets=use_async_sockets,
vllm_config=engine_config,
executor_class=executor_class,
Expand Down Expand Up @@ -237,19 +243,18 @@ def run_mq_llm_engine(engine_args: AsyncEngineArgs,
signal.signal(signal.SIGTERM, signal_handler)

parent_process = psutil.Process().parent()
engine: Optional[MQLLMEngine]
engine: Optional[MQLLMEngine] = None
try:
engine = MQLLMEngine.from_engine_args(engine_args, usage_context,
input_path, output_path)
# Send Readiness signal to EngineClient.
ready_pipe.send({
"status": "READY",
"data": {
"is_tracing_enabled": engine.engine.is_tracing_enabled()
}
})
tracing_data = {"is_tracing_enabled": engine.engine.is_tracing_enabled()}

Check failure on line 251 in vllm/engine/multiprocessing/engine.py

View workflow job for this annotation

GitHub Actions / ruff (3.12)

Ruff (E501)

vllm/engine/multiprocessing/engine.py:251:81: E501 Line too long (85 > 80)
ready_pipe.send({"status": "READY", "data": tracing_data})
engine.run_engine_loop()

except KeyboardInterrupt:
raise

# If an exception arises, log the error and raise a SIGQUIT.
# The parent process will listen for SIGQUIT and shutdown if
# it arises. The root cause will show up at the bottom of the
Expand Down
21 changes: 11 additions & 10 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,21 @@ async def build_async_engine_client_from_engine_args(
"you will find inaccurate metrics. Unset the variable "
"and vLLM will properly handle cleanup.")

build_mq_engine = partial(MQLLMEngineClient,
engine_args=engine_args,
usage_context=UsageContext.OPENAI_API_SERVER)

mq_engine_client: Optional[MQLLMEngineClient] = None
try:
build_engine = partial(
MQLLMEngineClient,
engine_args=engine_args,
usage_context=UsageContext.OPENAI_API_SERVER)
engine_client = await asyncio.get_running_loop().run_in_executor(
None, build_engine)
mq_engine_client = MQLLMEngineClient(engine_args)
mq_engine_client = await asyncio.get_running_loop().run_in_executor(
None, build_mq_engine)

yield mq_engine_client # type: ignore[misc]

finally:
# Close all open connections to the backend
mq_engine_client.close()
# Shutdown background process + connections to backend.
if mq_engine_client:
mq_engine_client.shutdown()

# Lazy import for prometheus multiprocessing.
# We need to set PROMETHEUS_MULTIPROC_DIR environment variable
Expand Down

0 comments on commit 32784a7

Please sign in to comment.