diff --git a/ddtrace/contrib/internal/langchain/patch.py b/ddtrace/contrib/internal/langchain/patch.py index ce72e1affff..fa2332d70f2 100644 --- a/ddtrace/contrib/internal/langchain/patch.py +++ b/ddtrace/contrib/internal/langchain/patch.py @@ -1,4 +1,3 @@ -import json import os import sys from typing import Any @@ -954,17 +953,22 @@ def _on_span_started(span: Span): span.set_tag_str("langchain.request.inputs.%d.%s" % (idx, k), integration.trunc(str(v))) def _on_span_finished(span: Span, streamed_chunks): + maybe_parser = instance.steps[-1] if instance.steps else None if ( streamed_chunks and langchain_core - and isinstance(instance.steps[-1], langchain_core.output_parsers.JsonOutputParser) + and isinstance(maybe_parser, langchain_core.output_parsers.JsonOutputParser) ): - # it's possible that the chain has a json output parser - # this will have already concatenated the chunks into a json object + # it's possible that the chain has a json output parser type + # this will have already concatenated the chunks into an object - # it's also possible the json output parser isn't the last step, + # it's also possible the this parser type isn't the last step, # but one of the last steps, in which case we won't act on it here - content = json.dumps(streamed_chunks[-1]) + result = streamed_chunks[-1] + if maybe_parser.__class__.__name__ == "JsonOutputParser": + content = safe_json(result) + else: + content = str(result) else: # best effort to join chunks together content = "".join([str(chunk) for chunk in streamed_chunks]) diff --git a/releasenotes/notes/langchain-pydantic-output-parsers-19bc162212ec051e.yaml b/releasenotes/notes/langchain-pydantic-output-parsers-19bc162212ec051e.yaml new file mode 100644 index 00000000000..687e465723a --- /dev/null +++ b/releasenotes/notes/langchain-pydantic-output-parsers-19bc162212ec051e.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + langchain: resolves a JSON decoding issue resulting from tagging streamed outputs from chains ending with a PydanticOutputParser.