Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming #1874

Merged
merged 23 commits into from
Dec 17, 2024
Merged

Streaming #1874

merged 23 commits into from
Dec 17, 2024

Conversation

CyrusNuevoDia
Copy link
Collaborator

@CyrusNuevoDia CyrusNuevoDia commented Nov 29, 2024

dspy.streamify can be used to convert the dspy program to a streaming mode. This is useful when you want to stream
the intermediate outputs (i.e. O1-style reasoning) to the client before the final prediction is ready. This uses
asyncify under the hood and inherits the execution semantics.

The deltas of every module in the program are streamed directly with no processing and then once the final prediction is ready it is yielded.

Here's how it works for deployment

from fastapi.responses import StreamingResponse

streaming_dspy_program = dspy.streamify(dspy.ChainOfThought("question -> answer"))

@app.post("/predict/stream")
async def stream(question: Question):
    async def generate():
        async for value in streaming_dspy_program(question=question.text):
            if isinstance(value, dspy.Prediction):
                data = {"prediction": value.labels().toDict()}
            elif isinstance(value, litellm.ModelResponse):
                data = {"chunk": value.json()}
            yield f"data: {ujson.dumps(data)}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

# Since you're often going to want to stream the result of a DSPy program as server-sent events,
# we've included a helper function for that, which is equivalent to the code above.

from dspy.utils.streaming import streaming_response

@app.post("/predict/stream")
async def stream(question: Question):
    stream = streaming_dspy_program(question=question.text)
    return StreamingResponse(streaming_response(stream), media_type="text/event-stream")

Changes

  • New in-memory LMRequestLRUCache with a default max size of 10_000_000.

Notes

  • No intermediate details are streamed with a cache hit on the in-memory LRU cache because we have the final result instantly. Streaming should work with in-memory cache turned off, which enables the LiteLLM cache

@CyrusNuevoDia CyrusNuevoDia requested a review from okhat November 29, 2024 02:21
@okhat
Copy link
Collaborator

okhat commented Dec 1, 2024

This looks AMAZING, thanks @CyrusNuevoDia ! I'm just wrapping my head around the caching improvements here (which I quite like so far) and then will merge

@CyrusNuevoDia
Copy link
Collaborator Author

@okhat

Before: Infinite LRU caches (unbounded memory growth in prod with cache=True)
After: Bounded LRU caches

Before: Had to serialize/deserialize JSON to cache properly
After: Just need to serialize (to compute the hash key)

Anything else I can help clarify?

@bahtman
Copy link

bahtman commented Dec 2, 2024

Hi @CyrusNuevoDia. Sick feature, have been waiting for this! :)
Do you know whether this fixes the issue with serializing callable kwargs?
If it does not, you might be able to implement the fix built in #1862

@CyrusNuevoDia
Copy link
Collaborator Author

Hey @bahtman just added your proposed fix, and @okhat added dump/load so you can share request caches :)

dspy/utils/caching.py Outdated Show resolved Hide resolved
dspy/utils/caching.py Outdated Show resolved Hide resolved
Signed-off-by: dbczumar <[email protected]>
dspy/utils/caching.py Outdated Show resolved Hide resolved
@bahtman
Copy link

bahtman commented Dec 5, 2024

@dbczumar
Do you see my comments? It says pending when I reply.

@rohitgarud
Copy link

Hi @dbczumar, Will it be possible to apply assertions/suggestions on the partially streamed output so that we can stop streaming if the assertion fails and reduce the token usage by trying again with prompt added due to assertion failure? This might be particularly useful if we are applying multiple assertions to assess the output according to multiple dimensions

@CyrusNuevoDia
Copy link
Collaborator Author

@rohitgarud great idea! out of scope for this current version but would be awesome

@CyrusNuevoDia
Copy link
Collaborator Author

@dbczumar merged in your request cache logic, getting this error on tests — any idea how to fix it?

ImportError while importing test module '/Users/knrz/Git/stanfordnlp/dspy/tests/caching/test_caching.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
../../../.rye/py/[email protected]/lib/python3.9/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
tests/caching/test_caching.py:10: in <module>
    from tests.test_utils.server import litellm_test_server, read_litellm_test_server_request_logs
E   ModuleNotFoundError: No module named 'tests.test_utils'
____________________________________________________________________________________ ERROR collecting tests/clients/test_lm.py ____________________________________________________________________________________
ImportError while importing test module '/Users/knrz/Git/stanfordnlp/dspy/tests/clients/test_lm.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
../../../.rye/py/[email protected]/lib/python3.9/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
tests/clients/test_lm.py:8: in <module>
    from tests.test_utils.server import litellm_test_server
E   ModuleNotFoundError: No module named 'tests.test_utils'
_______________________________________________________________________________________ ERROR collecting tests/reliability ________________________________________________________________________________________
../../../.rye/py/[email protected]/lib/python3.9/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
<frozen importlib._bootstrap>:1030: in _gcd_import
    ???
<frozen importlib._bootstrap>:1007: in _find_and_load
    ???
<frozen importlib._bootstrap>:986: in _find_and_load_unlocked
    ???
<frozen importlib._bootstrap>:680: in _load_unlocked
    ???
.venv/lib/python3.9/site-packages/_pytest/assertion/rewrite.py:184: in exec_module
    exec(co, module.__dict__)
tests/reliability/conftest.py:6: in <module>
    from tests.conftest import clear_settings
E   ImportError: cannot import name 'clear_settings' from 'tests.conftest' (/Users/knrz/Git/stanfordnlp/dspy/.venv/lib/python3.9/site-packages/tests/conftest.py)

Signed-off-by: dbczumar <[email protected]>
Signed-off-by: dbczumar <[email protected]>
Signed-off-by: dbczumar <[email protected]>
Signed-off-by: dbczumar <[email protected]>
Signed-off-by: dbczumar <[email protected]>
Signed-off-by: dbczumar <[email protected]>
Signed-off-by: dbczumar <[email protected]>
@dbczumar
Copy link
Collaborator

@dbczumar merged in your request cache logic, getting this error on tests — any idea how to fix it?

ImportError while importing test module '/Users/knrz/Git/stanfordnlp/dspy/tests/caching/test_caching.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
../../../.rye/py/[email protected]/lib/python3.9/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
tests/caching/test_caching.py:10: in <module>
    from tests.test_utils.server import litellm_test_server, read_litellm_test_server_request_logs
E   ModuleNotFoundError: No module named 'tests.test_utils'
____________________________________________________________________________________ ERROR collecting tests/clients/test_lm.py ____________________________________________________________________________________
ImportError while importing test module '/Users/knrz/Git/stanfordnlp/dspy/tests/clients/test_lm.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
../../../.rye/py/[email protected]/lib/python3.9/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
tests/clients/test_lm.py:8: in <module>
    from tests.test_utils.server import litellm_test_server
E   ModuleNotFoundError: No module named 'tests.test_utils'
_______________________________________________________________________________________ ERROR collecting tests/reliability ________________________________________________________________________________________
../../../.rye/py/[email protected]/lib/python3.9/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
<frozen importlib._bootstrap>:1030: in _gcd_import
    ???
<frozen importlib._bootstrap>:1007: in _find_and_load
    ???
<frozen importlib._bootstrap>:986: in _find_and_load_unlocked
    ???
<frozen importlib._bootstrap>:680: in _load_unlocked
    ???
.venv/lib/python3.9/site-packages/_pytest/assertion/rewrite.py:184: in exec_module
    exec(co, module.__dict__)
tests/reliability/conftest.py:6: in <module>
    from tests.conftest import clear_settings
E   ImportError: cannot import name 'clear_settings' from 'tests.conftest' (/Users/knrz/Git/stanfordnlp/dspy/.venv/lib/python3.9/site-packages/tests/conftest.py)

Thanks @CyrusNuevoDia ! I'll push some updates to remove caching changes from this PR. My reasoning is that streaming and caching aren't directly related (I added some test coverage to verify that caching works properly with streaming, though). We can make adjustments to caching in future PRs.

Signed-off-by: dbczumar <[email protected]>
@@ -17,6 +18,7 @@
backoff_time=10,
callbacks=[],
async_max_workers=8,
send_stream=None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only meaningful change in the file - everything else is a linter adjustment

@okhat okhat merged commit 027312b into main Dec 17, 2024
6 checks passed
@MohammedAlhajji
Copy link
Contributor

Thanks @CyrusNuevoDia This looks great. Appreciate it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants