Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tracing): add anthropic streaming support #9471

Merged
merged 48 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
168827e
initial commit
wconti27 Jun 3, 2024
3a568de
add riotfile change
wconti27 Jun 3, 2024
159cadc
llm obs integration
wconti27 Jun 3, 2024
11a6861
add tests
wconti27 Jun 3, 2024
716b6ba
more clean up
wconti27 Jun 3, 2024
92baacc
more changes
wconti27 Jun 3, 2024
459f6bb
add init comment
wconti27 Jun 3, 2024
d25e7b8
reduce max tokens
wconti27 Jun 3, 2024
2cda152
add async support
wconti27 Jun 3, 2024
f093d53
add release note
wconti27 Jun 3, 2024
95dc7d4
add async
wconti27 Jun 3, 2024
2d50206
merge in async support
wconti27 Jun 3, 2024
e2b294f
add async llm tags
wconti27 Jun 3, 2024
9ab3ce9
add streaming
wconti27 Jun 3, 2024
50d1906
fix async error code
wconti27 Jun 4, 2024
3773cfa
add more tests
wconti27 Jun 4, 2024
a5ffc9c
fix riotfile
wconti27 Jun 4, 2024
569bdce
add tools and refactor PR
wconti27 Jun 4, 2024
785ada9
add tools tests
wconti27 Jun 4, 2024
d90c92a
Merge branch 'main' into conti/add-anthropic-patch
wconti27 Jun 4, 2024
cd8cc39
update riotfile
wconti27 Jun 4, 2024
d16c466
Merge branch 'conti/add-anthropic-patch' into conti/add-anthropic-mlo…
wconti27 Jun 4, 2024
8d7f1a5
merge
wconti27 Jun 4, 2024
8c75f65
fix snapshots / cassettes
wconti27 Jun 4, 2024
28d8afd
Merge branch 'conti/add-anthropic-patch' into conti/add-anthropic-mlo…
wconti27 Jun 4, 2024
bedf3c0
merge
wconti27 Jun 4, 2024
068c2c3
merge with main
wconti27 Jun 5, 2024
c2684c1
add system prompt
wconti27 Jun 5, 2024
25ec8ce
Merge branch 'main' into conti/add-anthropic-mlobs-methods
wconti27 Jun 5, 2024
2d6d786
Merge branch 'conti/add-anthropic-mlobs-methods' into conti/add-anthr…
wconti27 Jun 5, 2024
1d3044e
small fixes
wconti27 Jun 5, 2024
0fc7297
small fix
wconti27 Jun 5, 2024
674a1dc
Merge branch 'conti/add-anthropic-mlobs-methods' of github.com:DataDo…
wconti27 Jun 5, 2024
f657c8f
changes
wconti27 Jun 5, 2024
fd397c4
merge
wconti27 Jun 5, 2024
f63c2e4
add release note
wconti27 Jun 5, 2024
e8fe53a
merge with main
wconti27 Jun 6, 2024
bf0b521
remove unnecessary lock files
wconti27 Jun 6, 2024
e630709
add release note
wconti27 Jun 6, 2024
8f5e76d
Update releasenotes/notes/add-anthropic-streaming-support-01937d2e524…
wconti27 Jun 7, 2024
1c1df2f
more changes
wconti27 Jun 7, 2024
613bf82
fix function signature
wconti27 Jun 7, 2024
fafd195
remove content block start
wconti27 Jun 11, 2024
952ed59
more tests
wconti27 Jun 11, 2024
717bd6f
Merge branch 'main' into conti/add-anthropic-streaming-support
wconti27 Jun 11, 2024
841015c
add more tests
wconti27 Jun 11, 2024
d47ead7
merge
wconti27 Jun 11, 2024
cd3b1f1
Merge branch 'main' into conti/add-anthropic-streaming-support
wconti27 Jun 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 322 additions & 0 deletions ddtrace/contrib/anthropic/_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,322 @@
import sys
from typing import Dict
from typing import Tuple

from ddtrace.internal.logger import get_logger
from ddtrace.vendor import wrapt

from .utils import _get_attr
wconti27 marked this conversation as resolved.
Show resolved Hide resolved


log = get_logger(__name__)


def handle_streamed_response(integration, resp, args, kwargs, span):
if _is_stream(resp):
return TracedAnthropicStream(resp, integration, span, args, kwargs)
elif _is_async_stream(resp):
return TracedAnthropicAsyncStream(resp, integration, span, args, kwargs)
elif _is_stream_manager(resp):
return TracedAnthropicStreamManager(resp, integration, span, args, kwargs)
elif _is_async_stream_manager(resp):
return TracedAnthropicAsyncStreamManager(resp, integration, span, args, kwargs)


class BaseTracedAnthropicStream(wrapt.ObjectProxy):
def __init__(self, wrapped, integration, span, args, kwargs):
super().__init__(wrapped)
n = kwargs.get("n", 1) or 1
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
self._dd_span = span
self._streamed_chunks = [[] for _ in range(n)]
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
self._dd_integration = integration
self._kwargs = kwargs
self._args = args


class TracedAnthropicStream(BaseTracedAnthropicStream):
def __enter__(self):
self.__wrapped__.__enter__()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.__wrapped__.__exit__(exc_type, exc_val, exc_tb)

def __iter__(self):
return self

def __next__(self):
try:
chunk = self.__wrapped__.__next__()
self._streamed_chunks.append(chunk)
return chunk
except StopIteration:
_process_finished_stream(
self._dd_integration, self._dd_span, self._args, self._kwargs, self._streamed_chunks
)
self._dd_span.finish()
raise
except Exception:
self._dd_span.set_exc_info(*sys.exc_info())
self._dd_span.finish()
raise

def _text_stream(self):
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
for chunk in self:
if chunk.type == "content_block_delta" and chunk.delta.type == "text_delta":
yield chunk.delta.text


class TracedAnthropicAsyncStream(BaseTracedAnthropicStream):
async def __aenter__(self):
await self.__wrapped__.__aenter__()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb)

def __aiter__(self):
return self

async def __anext__(self):
try:
chunk = await self.__wrapped__.__anext__()
self._streamed_chunks.append(chunk)
return chunk
except StopAsyncIteration:
_process_finished_stream(
self._dd_integration,
self._dd_span,
self._args,
self._kwargs,
self._streamed_chunks,
)
self._dd_span.finish()
raise
except Exception:
self._dd_span.set_exc_info(*sys.exc_info())
self._dd_span.finish()
raise

async def _text_stream(self):
async for chunk in self:
if chunk.type == "content_block_delta" and chunk.delta.type == "text_delta":
yield chunk.delta.text


class TracedAnthropicStreamManager(BaseTracedAnthropicStream):
def __enter__(self):
stream = self.__wrapped__.__enter__()
traced_stream = TracedAnthropicStream(
stream,
self._dd_integration,
self._dd_span,
self._args,
self._kwargs,
)
traced_stream.text_stream = traced_stream._text_stream()
brettlangdon marked this conversation as resolved.
Show resolved Hide resolved
return traced_stream

def __exit__(self, exc_type, exc_val, exc_tb):
self.__wrapped__.__exit__(exc_type, exc_val, exc_tb)


class TracedAnthropicAsyncStreamManager(BaseTracedAnthropicStream):
async def __aenter__(self):
stream = await self.__wrapped__.__aenter__()
traced_stream = TracedAnthropicAsyncStream(
stream,
self._dd_integration,
self._dd_span,
self._args,
self._kwargs,
)
traced_stream.text_stream = traced_stream._text_stream()
return traced_stream

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb)
Yun-Kim marked this conversation as resolved.
Show resolved Hide resolved


def _process_finished_stream(integration, span, args, kwargs, streamed_chunks):
# builds the response message given streamed chunks and sets according span tags
resp_message = {}
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
try:
resp_message = _construct_message(streamed_chunks)

if integration.is_pc_sampled_span(span):
_tag_streamed_chat_completion_response(integration, span, resp_message)
if integration.is_pc_sampled_llmobs(span):
integration.llmobs_set_tags(
span=span,
resp=resp_message,
args=args,
kwargs=kwargs,
)
except Exception:
log.warning("Error processing streamed completion/chat response.", exc_info=True)


def _construct_message(streamed_chunks):
"""Iteratively build up a response message from streamed chunks.

The resulting message dictionary is of form:
{"content": [{"type": [TYPE], "text": "[TEXT]"}], "role": "...", "finish_reason": "...", "usage": ...}
"""
message = {"content": []}
for chunk in streamed_chunks:
message = _extract_from_chunk(chunk, message)

if "finish_reason" in message:
return message
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
return message


def _extract_from_chunk(chunk, message={}) -> Tuple[Dict[str, str], bool]:
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
"""Constructs a chat message dictionary from streamed chunks given chunk type"""
TRANSFORMATIONS_BY_BLOCK_TYPE = {
"message_start": _on_message_start_chunk,
"content_block_start": _on_content_block_start_chunk,
"content_block_delta": _on_content_block_delta_chunk,
"message_delta": _on_message_delta_chunk,
}
chunk_type = getattr(chunk, "type", "")
transformation = TRANSFORMATIONS_BY_BLOCK_TYPE.get(chunk_type)
if transformation is not None:
message = transformation(chunk, message)

return message


def _on_message_start_chunk(chunk, message):
# this is the starting chunk of the message
if getattr(chunk, "type", "") != "message_start":
return message

chunk_message = getattr(chunk, "message", "")
if chunk_message:
content_text = ""
contents = getattr(chunk.message, "content", [])
for content in contents:
if content.type == "text":
content_text += content.text
content_type = "text"
elif content.type == "image":
content_text = "([IMAGE DETECTED])"
content_type = "image"
message["content"].append({"text": content_text, "type": content_type})
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
wconti27 marked this conversation as resolved.
Show resolved Hide resolved

chunk_role = getattr(chunk_message, "role", "")
chunk_usage = getattr(chunk_message, "usage", "")
chunk_finish_reason = getattr(chunk_message, "stop_reason", "")
if chunk_role:
message["role"] = chunk_role
if chunk_usage:
message["usage"] = {}
message["usage"]["input_tokens"] = getattr(chunk_usage, "input_tokens", 0)
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
message["usage"]["output_tokens"] = getattr(chunk_usage, "output_tokens", 0)
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
if chunk_finish_reason:
message["finish_reason"] = chunk_finish_reason
return message


def _on_content_block_start_chunk(chunk, message):
# this is the start to a message.content block (possibly 1 of several content blocks)
if getattr(chunk, "type", "") != "content_block_start":
return message
wconti27 marked this conversation as resolved.
Show resolved Hide resolved

message["content"].append({"type": "text", "text": ""})
return message


def _on_content_block_delta_chunk(chunk, message):
# delta events contain new content for the current message.content block
if getattr(chunk, "type", "") != "content_block_delta":
return message
wconti27 marked this conversation as resolved.
Show resolved Hide resolved

delta_block = getattr(chunk, "delta", "")
chunk_content = getattr(delta_block, "text", "")
if chunk_content:
message["content"][-1]["text"] += chunk_content
return message


def _on_message_delta_chunk(chunk, message):
# message delta events signal the end of the message
if getattr(chunk, "type", "") != "message_delta":
return message
wconti27 marked this conversation as resolved.
Show resolved Hide resolved

delta_block = getattr(chunk, "delta", "")
chunk_finish_reason = getattr(delta_block, "stop_reason", "")
if chunk_finish_reason:
message["finish_reason"] = chunk_finish_reason
message["content"][-1]["text"] = message["content"][-1]["text"].strip()

chunk_usage = getattr(chunk, "usage", {})
if chunk_usage:
message_usage = message.get("usage", {"output_tokens": 0, "input_tokens": 0})
message_usage["output_tokens"] += getattr(chunk_usage, "output_tokens", 0)
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
message_usage["input_tokens"] += getattr(chunk_usage, "input_tokens", 0)
message["usage"] = message_usage

return message


# To-Do: Handle error blocks appropriately
Yun-Kim marked this conversation as resolved.
Show resolved Hide resolved
# def _on_error_chunk(chunk, message):
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
# # this is the start to a message.content block (possibly 1 of several content blocks)
# if getattr(chunk, "type", "") != "error":
# return message

# message["content"].append({"type": "text", "text": ""})
# return message


def _tag_streamed_chat_completion_response(integration, span, message):
"""Tagging logic for streamed chat completions."""
if message is None:
return
for idx, block in enumerate(message["content"]):
span.set_tag_str("anthropic.response.completions.content.%d.type" % idx, str(integration.trunc(block["type"])))
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
span.set_tag_str("anthropic.response.completions.content.%d.text" % idx, str(integration.trunc(block["text"])))
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
span.set_tag_str("anthropic.response.completions.role", str(message["role"]))
if message.get("finish_reason") is not None:
span.set_tag_str("anthropic.response.completions.finish_reason", str(message["finish_reason"]))

usage = _get_attr(message, "usage", {})
integration.record_usage(span, usage)


def _is_stream(resp):
# type: (...) -> bool
import anthropic

if hasattr(anthropic, "Stream") and isinstance(resp, anthropic.Stream):
return True
return False


def _is_async_stream(resp):
# type: (...) -> bool
import anthropic

if hasattr(anthropic, "AsyncStream") and isinstance(resp, anthropic.AsyncStream):
return True
return False


def _is_stream_manager(resp):
# type: (...) -> bool
import anthropic

if hasattr(anthropic, "MessageStreamManager") and isinstance(resp, anthropic.MessageStreamManager):
return True
return False


def _is_async_stream_manager(resp):
# type: (...) -> bool
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
import anthropic

if hasattr(anthropic, "AsyncMessageStreamManager") and isinstance(resp, anthropic.AsyncMessageStreamManager):
return True
return False
Loading
Loading