From c3367971f0b9bd7c600f8fccf16e403ae3e01db4 Mon Sep 17 00:00:00 2001 From: Sam Brenner <106700075+sabrenner@users.noreply.github.com> Date: Wed, 11 Dec 2024 14:07:46 -0500 Subject: [PATCH] fix(langchain): pydantic output parser tagging does not throw (#11652) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MLOB-1973 ## What does this PR do? Fixes #11638 Adds some extra checking around the tagging of JSON-like output parsers in streamed cases. These kinds of output parsers concatenate their output for us, so we do not need to append a bunch of chunks together. It was previously thought that the only type was `JsonOutputParser`, which could be `json.dumps`'d as a string tag. However, the `PydanticOutputParser` inherits from `JsonOutputParser`, but cannot be JSON dumped. Thus, we just stringify it instead. To avoid this behavior of throwing in the future, I've added a `try`/`except` to the `json.dumps`. I've special-cased `PydanticOuputParser` as to not generalize it as an expensive exception to `json.dumps`. These are the only two JSON-type output parsers I've seen, but should more be introduced, we'll log our incompatability and just attempt to `str` it instead. ## Testing For a script like: ```python from typing import List from langchain_core.output_parsers import PydanticOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI from pydantic import BaseModel, Field class Person(BaseModel): """Information about a person.""" name: str = Field(..., description="The name of the person") height_in_meters: float = Field( ..., description="The height of the person expressed in meters." ) class People(BaseModel): """Identifying information about all people in a text.""" people: List[Person] # Set up a parser parser = PydanticOutputParser(pydantic_object=People) # Prompt prompt = ChatPromptTemplate.from_messages( [ ( "system", "Answer the user query. Wrap the output in `json` tags\n{format_instructions}", ), ("human", "{query}"), ] ).partial(format_instructions=parser.get_format_instructions()) query = "Anna is 23 years old and she is 6 feet tall" llm = ChatOpenAI() chain = prompt | llm | parser for event in chain.stream({ "query": query }): print(event) ``` The output tagging is as follows on APM spans: Screenshot 2024-12-10 at 12 04 57 PM and LLMObs spans: Screenshot 2024-12-10 at 12 05 17 PM without throwing errors. ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) (cherry picked from commit 9ba734f98c89c50dd4d58e7a2521e4964e1ba670) --- ddtrace/contrib/internal/langchain/patch.py | 16 ++++++++++------ ...pydantic-output-parsers-19bc162212ec051e.yaml | 4 ++++ 2 files changed, 14 insertions(+), 6 deletions(-) create mode 100644 releasenotes/notes/langchain-pydantic-output-parsers-19bc162212ec051e.yaml diff --git a/ddtrace/contrib/internal/langchain/patch.py b/ddtrace/contrib/internal/langchain/patch.py index ce72e1affff..fa2332d70f2 100644 --- a/ddtrace/contrib/internal/langchain/patch.py +++ b/ddtrace/contrib/internal/langchain/patch.py @@ -1,4 +1,3 @@ -import json import os import sys from typing import Any @@ -954,17 +953,22 @@ def _on_span_started(span: Span): span.set_tag_str("langchain.request.inputs.%d.%s" % (idx, k), integration.trunc(str(v))) def _on_span_finished(span: Span, streamed_chunks): + maybe_parser = instance.steps[-1] if instance.steps else None if ( streamed_chunks and langchain_core - and isinstance(instance.steps[-1], langchain_core.output_parsers.JsonOutputParser) + and isinstance(maybe_parser, langchain_core.output_parsers.JsonOutputParser) ): - # it's possible that the chain has a json output parser - # this will have already concatenated the chunks into a json object + # it's possible that the chain has a json output parser type + # this will have already concatenated the chunks into an object - # it's also possible the json output parser isn't the last step, + # it's also possible the this parser type isn't the last step, # but one of the last steps, in which case we won't act on it here - content = json.dumps(streamed_chunks[-1]) + result = streamed_chunks[-1] + if maybe_parser.__class__.__name__ == "JsonOutputParser": + content = safe_json(result) + else: + content = str(result) else: # best effort to join chunks together content = "".join([str(chunk) for chunk in streamed_chunks]) diff --git a/releasenotes/notes/langchain-pydantic-output-parsers-19bc162212ec051e.yaml b/releasenotes/notes/langchain-pydantic-output-parsers-19bc162212ec051e.yaml new file mode 100644 index 00000000000..687e465723a --- /dev/null +++ b/releasenotes/notes/langchain-pydantic-output-parsers-19bc162212ec051e.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + langchain: resolves a JSON decoding issue resulting from tagging streamed outputs from chains ending with a PydanticOutputParser.