Skip to content

Commit

Permalink
fix(langchain): pydantic output parser tagging does not throw (#11652)
Browse files Browse the repository at this point in the history
MLOB-1973

## What does this PR do?
Fixes #11638

Adds some extra checking around the tagging of JSON-like output parsers
in streamed cases. These kinds of output parsers concatenate their
output for us, so we do not need to append a bunch of chunks together.
It was previously thought that the only type was `JsonOutputParser`,
which could be `json.dumps`'d as a string tag. However, the
`PydanticOutputParser` inherits from `JsonOutputParser`, but cannot be
JSON dumped. Thus, we just stringify it instead.

To avoid this behavior of throwing in the future, I've added a
`try`/`except` to the `json.dumps`. I've special-cased
`PydanticOuputParser` as to not generalize it as an expensive exception
to `json.dumps`. These are the only two JSON-type output parsers I've
seen, but should more be introduced, we'll log our incompatability and
just attempt to `str` it instead.

## Testing
For a script like:
```python
from typing import List

from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field

class Person(BaseModel):
    """Information about a person."""

    name: str = Field(..., description="The name of the person")
    height_in_meters: float = Field(
        ..., description="The height of the person expressed in meters."
    )

class People(BaseModel):
    """Identifying information about all people in a text."""

    people: List[Person]

# Set up a parser
parser = PydanticOutputParser(pydantic_object=People)

# Prompt
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "Answer the user query. Wrap the output in `json` tags\n{format_instructions}",
        ),
        ("human", "{query}"),
    ]
).partial(format_instructions=parser.get_format_instructions())

query = "Anna is 23 years old and she is 6 feet tall"

llm = ChatOpenAI()

chain = prompt | llm | parser

for event in chain.stream({ "query": query }):
    print(event)
```
The output tagging is as follows on APM spans:
<img width="530" alt="Screenshot 2024-12-10 at 12 04 57 PM"
src="https://github.com/user-attachments/assets/623b778a-bd6b-45ef-b56c-d0c3d2ada9ae">

and LLMObs spans:
<img width="714" alt="Screenshot 2024-12-10 at 12 05 17 PM"
src="https://github.com/user-attachments/assets/77bce189-a4ce-40a6-bd9f-fd2a189184f5">

without throwing errors.

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

(cherry picked from commit 9ba734f)
  • Loading branch information
sabrenner authored and github-actions[bot] committed Dec 11, 2024
1 parent d5f9c27 commit c336797
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
16 changes: 10 additions & 6 deletions ddtrace/contrib/internal/langchain/patch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
import sys
from typing import Any
Expand Down Expand Up @@ -954,17 +953,22 @@ def _on_span_started(span: Span):
span.set_tag_str("langchain.request.inputs.%d.%s" % (idx, k), integration.trunc(str(v)))

def _on_span_finished(span: Span, streamed_chunks):
maybe_parser = instance.steps[-1] if instance.steps else None
if (
streamed_chunks
and langchain_core
and isinstance(instance.steps[-1], langchain_core.output_parsers.JsonOutputParser)
and isinstance(maybe_parser, langchain_core.output_parsers.JsonOutputParser)
):
# it's possible that the chain has a json output parser
# this will have already concatenated the chunks into a json object
# it's possible that the chain has a json output parser type
# this will have already concatenated the chunks into an object

# it's also possible the json output parser isn't the last step,
# it's also possible the this parser type isn't the last step,
# but one of the last steps, in which case we won't act on it here
content = json.dumps(streamed_chunks[-1])
result = streamed_chunks[-1]
if maybe_parser.__class__.__name__ == "JsonOutputParser":
content = safe_json(result)
else:
content = str(result)
else:
# best effort to join chunks together
content = "".join([str(chunk) for chunk in streamed_chunks])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
langchain: resolves a JSON decoding issue resulting from tagging streamed outputs from chains ending with a PydanticOutputParser.

0 comments on commit c336797

Please sign in to comment.