Skip to content

Commit

Permalink
Refactor MarkdownStream logic
Browse files Browse the repository at this point in the history
  • Loading branch information
cpsievert committed Jan 31, 2025
1 parent 570335c commit 7c0a536
Showing 1 changed file with 25 additions and 33 deletions.
58 changes: 25 additions & 33 deletions shiny/ui/_markdown_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ async def stream(
@reactive.extended_task
async def _task():
if clear:
await self._replace("")
await self._send_content_message("", "replace")
async with self._streaming_dot():
async for c in content:
await self._append(c)
await self._send_content_message(c, "append")

_task()

Expand All @@ -163,40 +163,39 @@ async def _handle_error():
await self._raise_exception(e)
_handle_error.destroy() # type: ignore

async def _append(self, content: str):
msg: ContentMessage = {
"id": self.id,
"content": content,
"operation": "append",
}

await self._send_custom_message(msg)
@asynccontextmanager
async def _streaming_dot(self):
await self._send_stream_message(True)
try:
yield
finally:
await self._send_stream_message(False)

async def _replace(self, content: str):
async def _send_content_message(
self,
content: str,
operation: Literal["append", "replace"],
):
msg: ContentMessage = {
"id": self.id,
"content": content,
"operation": "replace",
"operation": operation,
}

await self._send_custom_message(msg)

@asynccontextmanager
async def _streaming_dot(self):
start: isStreamingMessage = {
async def _send_stream_message(self, is_streaming: bool):
msg: isStreamingMessage = {
"id": self.id,
"isStreaming": True,
"isStreaming": is_streaming,
}
await self._send_custom_message(start)
await self._send_custom_message(msg)

try:
yield
finally:
end: isStreamingMessage = {
"id": self.id,
"isStreaming": False,
}
await self._send_custom_message(end)
async def _send_custom_message(
self, msg: Union[ContentMessage, isStreamingMessage]
):
if self._session.is_stub_session():
return
await self._session.send_custom_message("shinyMarkdownStreamMessage", {**msg})

async def _raise_exception(self, e: BaseException):
if self.on_error == "unhandled":
Expand All @@ -206,13 +205,6 @@ async def _raise_exception(self, e: BaseException):
msg = f"Error in MarkdownStream('{self.id}'): {str(e)}"
raise NotifyException(msg, sanitize=sanitize) from e

async def _send_custom_message(
self, msg: Union[ContentMessage, isStreamingMessage]
):
if self._session.is_stub_session():
return
await self._session.send_custom_message("shinyMarkdownStreamMessage", msg)


@add_example()
def markdown_stream_ui(
Expand Down

0 comments on commit 7c0a536

Please sign in to comment.