diff --git a/ddtrace/contrib/internal/openai/_endpoint_hooks.py b/ddtrace/contrib/internal/openai/_endpoint_hooks.py index 73a2b2511c9..979e1774a8a 100644 --- a/ddtrace/contrib/internal/openai/_endpoint_hooks.py +++ b/ddtrace/contrib/internal/openai/_endpoint_hooks.py @@ -255,6 +255,14 @@ def _record_request(self, pin, integration, span, args, kwargs): span.set_tag_str("openai.request.messages.%d.content" % idx, integration.trunc(str(content))) span.set_tag_str("openai.request.messages.%d.role" % idx, str(role)) span.set_tag_str("openai.request.messages.%d.name" % idx, str(name)) + if parse_version(OPENAI_VERSION) >= (1, 26) and kwargs.get("stream"): + if kwargs.get("stream_options", {}).get("include_usage", None) is not None: + # Only perform token chunk auto-extraction if this option is not explicitly set + return + span._set_ctx_item("_dd.auto_extract_token_chunk", True) + stream_options = kwargs.get("stream_options", {}) + stream_options["include_usage"] = True + kwargs["stream_options"] = stream_options def _record_response(self, pin, integration, span, args, kwargs, resp, error): resp = super()._record_response(pin, integration, span, args, kwargs, resp, error) diff --git a/ddtrace/contrib/internal/openai/utils.py b/ddtrace/contrib/internal/openai/utils.py index d967383e366..f5dfc10efef 100644 --- a/ddtrace/contrib/internal/openai/utils.py +++ b/ddtrace/contrib/internal/openai/utils.py @@ -48,11 +48,28 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.__wrapped__.__exit__(exc_type, exc_val, exc_tb) def __iter__(self): - return self + exception_raised = False + try: + for chunk in self.__wrapped__: + self._extract_token_chunk(chunk) + yield chunk + _loop_handler(self._dd_span, chunk, self._streamed_chunks) + except Exception: + self._dd_span.set_exc_info(*sys.exc_info()) + exception_raised = True + raise + finally: + if not exception_raised: + _process_finished_stream( + self._dd_integration, self._dd_span, self._kwargs, self._streamed_chunks, self._is_completion + ) + self._dd_span.finish() + self._dd_integration.metric(self._dd_span, "dist", "request.duration", self._dd_span.duration_ns) def __next__(self): try: chunk = self.__wrapped__.__next__() + self._extract_token_chunk(chunk) _loop_handler(self._dd_span, chunk, self._streamed_chunks) return chunk except StopIteration: @@ -68,6 +85,22 @@ def __next__(self): self._dd_integration.metric(self._dd_span, "dist", "request.duration", self._dd_span.duration_ns) raise + def _extract_token_chunk(self, chunk): + """Attempt to extract the token chunk (last chunk in the stream) from the streamed response.""" + if not self._dd_span._get_ctx_item("_dd.auto_extract_token_chunk"): + return + choice = getattr(chunk, "choices", [None])[0] + if not getattr(choice, "finish_reason", None): + # Only the second-last chunk in the stream with token usage enabled will have finish_reason set + return + try: + # User isn't expecting last token chunk to be present since it's not part of the default streamed response, + # so we consume it and extract the token usage metadata before it reaches the user. + usage_chunk = self.__wrapped__.__next__() + self._streamed_chunks[0].insert(0, usage_chunk) + except (StopIteration, GeneratorExit): + return + class TracedOpenAIAsyncStream(BaseTracedOpenAIStream): async def __aenter__(self): @@ -77,12 +110,29 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb) - def __aiter__(self): - return self + async def __aiter__(self): + exception_raised = False + try: + async for chunk in self.__wrapped__: + await self._extract_token_chunk(chunk) + yield chunk + _loop_handler(self._dd_span, chunk, self._streamed_chunks) + except Exception: + self._dd_span.set_exc_info(*sys.exc_info()) + exception_raised = True + raise + finally: + if not exception_raised: + _process_finished_stream( + self._dd_integration, self._dd_span, self._kwargs, self._streamed_chunks, self._is_completion + ) + self._dd_span.finish() + self._dd_integration.metric(self._dd_span, "dist", "request.duration", self._dd_span.duration_ns) async def __anext__(self): try: chunk = await self.__wrapped__.__anext__() + await self._extract_token_chunk(chunk) _loop_handler(self._dd_span, chunk, self._streamed_chunks) return chunk except StopAsyncIteration: @@ -98,6 +148,19 @@ async def __anext__(self): self._dd_integration.metric(self._dd_span, "dist", "request.duration", self._dd_span.duration_ns) raise + async def _extract_token_chunk(self, chunk): + """Attempt to extract the token chunk (last chunk in the stream) from the streamed response.""" + if not self._dd_span._get_ctx_item("_dd.auto_extract_token_chunk"): + return + choice = getattr(chunk, "choices", [None])[0] + if not getattr(choice, "finish_reason", None): + return + try: + usage_chunk = await self.__wrapped__.__anext__() + self._streamed_chunks[0].insert(0, usage_chunk) + except (StopAsyncIteration, GeneratorExit): + return + def _compute_token_count(content, model): # type: (Union[str, List[int]], Optional[str]) -> Tuple[bool, int] diff --git a/releasenotes/notes/feat-openai-streamed-chunk-auto-extract-4cbaea8870b1df13.yaml b/releasenotes/notes/feat-openai-streamed-chunk-auto-extract-4cbaea8870b1df13.yaml new file mode 100644 index 00000000000..afaf95876d5 --- /dev/null +++ b/releasenotes/notes/feat-openai-streamed-chunk-auto-extract-4cbaea8870b1df13.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + openai: Introduces automatic extraction of token usage from streamed chat completions. + Unless ``stream_options: {"include_usage": False}`` is explicitly set on your streamed chat completion request, + the OpenAI integration will add ``stream_options: {"include_usage": True}`` to your request and automatically extract the token usage chunk from the streamed response. diff --git a/tests/contrib/openai/test_openai_llmobs.py b/tests/contrib/openai/test_openai_llmobs.py index a1a2b93a5ca..a145877c8c8 100644 --- a/tests/contrib/openai/test_openai_llmobs.py +++ b/tests/contrib/openai/test_openai_llmobs.py @@ -518,11 +518,17 @@ async def test_chat_completion_azure_async( ) ) - def test_chat_completion_stream(self, openai, ddtrace_global_config, mock_llmobs_writer, mock_tracer): + @pytest.mark.skipif( + parse_version(openai_module.version.VERSION) < (1, 26), reason="Stream options only available openai >= 1.26" + ) + def test_chat_completion_stream_explicit_no_tokens( + self, openai, ddtrace_global_config, mock_llmobs_writer, mock_tracer + ): """Ensure llmobs records are emitted for chat completion endpoints when configured. Also ensure the llmobs records have the correct tagging including trace/span ID for trace correlation. """ + with get_openai_vcr(subdirectory_name="v1").use_cassette("chat_completion_streamed.yaml"): with mock.patch("ddtrace.contrib.internal.openai.utils.encoding_for_model", create=True) as mock_encoding: with mock.patch("ddtrace.contrib.internal.openai.utils._est_tokens") as mock_est: @@ -534,7 +540,11 @@ def test_chat_completion_stream(self, openai, ddtrace_global_config, mock_llmobs expected_completion = "The Los Angeles Dodgers won the World Series in 2020." client = openai.OpenAI() resp = client.chat.completions.create( - model=model, messages=input_messages, stream=True, user="ddtrace-test" + model=model, + messages=input_messages, + stream=True, + user="ddtrace-test", + stream_options={"include_usage": False}, ) for chunk in resp: resp_model = chunk.model @@ -547,7 +557,7 @@ def test_chat_completion_stream(self, openai, ddtrace_global_config, mock_llmobs model_provider="openai", input_messages=input_messages, output_messages=[{"content": expected_completion, "role": "assistant"}], - metadata={"stream": True, "user": "ddtrace-test"}, + metadata={"stream": True, "stream_options": {"include_usage": False}, "user": "ddtrace-test"}, token_metrics={"input_tokens": 8, "output_tokens": 8, "total_tokens": 16}, tags={"ml_app": "", "service": "tests.contrib.openai"}, ) @@ -557,20 +567,14 @@ def test_chat_completion_stream(self, openai, ddtrace_global_config, mock_llmobs parse_version(openai_module.version.VERSION) < (1, 26, 0), reason="Streamed tokens available in 1.26.0+" ) def test_chat_completion_stream_tokens(self, openai, ddtrace_global_config, mock_llmobs_writer, mock_tracer): - """ - Ensure llmobs records are emitted for chat completion endpoints when configured - with the `stream_options={"include_usage": True}`. - Also ensure the llmobs records have the correct tagging including trace/span ID for trace correlation. - """ + """Assert that streamed token chunk extraction logic works when options are not explicitly passed from user.""" with get_openai_vcr(subdirectory_name="v1").use_cassette("chat_completion_streamed_tokens.yaml"): model = "gpt-3.5-turbo" resp_model = model input_messages = [{"role": "user", "content": "Who won the world series in 2020?"}] expected_completion = "The Los Angeles Dodgers won the World Series in 2020." client = openai.OpenAI() - resp = client.chat.completions.create( - model=model, messages=input_messages, stream=True, stream_options={"include_usage": True} - ) + resp = client.chat.completions.create(model=model, messages=input_messages, stream=True) for chunk in resp: resp_model = chunk.model span = mock_tracer.pop_traces()[0][0] @@ -671,7 +675,6 @@ def test_chat_completion_tool_call_stream(self, openai, ddtrace_global_config, m messages=[{"role": "user", "content": chat_completion_input_description}], user="ddtrace-test", stream=True, - stream_options={"include_usage": True}, ) for chunk in resp: resp_model = chunk.model diff --git a/tests/contrib/openai/test_openai_v1.py b/tests/contrib/openai/test_openai_v1.py index f13de144fc5..91737d9e5eb 100644 --- a/tests/contrib/openai/test_openai_v1.py +++ b/tests/contrib/openai/test_openai_v1.py @@ -921,128 +921,78 @@ def test_span_finish_on_stream_error(openai, openai_vcr, snapshot_tracer): ) -def test_completion_stream(openai, openai_vcr, mock_metrics, mock_tracer): +@pytest.mark.snapshot +@pytest.mark.skipif(TIKTOKEN_AVAILABLE, reason="This test estimates token counts") +def test_completion_stream_est_tokens(openai, openai_vcr, mock_metrics, snapshot_tracer): with openai_vcr.use_cassette("completion_streamed.yaml"): with mock.patch("ddtrace.contrib.internal.openai.utils.encoding_for_model", create=True) as mock_encoding: mock_encoding.return_value.encode.side_effect = lambda x: [1, 2] - expected_completion = '! ... A page layouts page drawer? ... Interesting. The "Tools" is' client = openai.OpenAI() resp = client.completions.create(model="ada", prompt="Hello world", stream=True, n=None) - chunks = [c for c in resp] - - completion = "".join([c.choices[0].text for c in chunks]) - assert completion == expected_completion + _ = [c for c in resp] - traces = mock_tracer.pop_traces() - assert len(traces) == 1 - assert len(traces[0]) == 1 - assert traces[0][0].get_tag("openai.response.choices.0.text") == expected_completion - assert traces[0][0].get_tag("openai.response.choices.0.finish_reason") == "length" - expected_tags = [ - "version:", - "env:", - "service:tests.contrib.openai", - "openai.request.model:ada", - "model:ada", - "openai.request.endpoint:/v1/completions", - "openai.request.method:POST", - "openai.organization.id:", - "openai.organization.name:datadog-4", - "openai.user.api_key:sk-...key>", - "error:0", - "openai.estimated:true", - ] - if TIKTOKEN_AVAILABLE: - expected_tags = expected_tags[:-1] - assert mock.call.distribution("tokens.prompt", 2, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.distribution("tokens.completion", mock.ANY, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.distribution("tokens.total", mock.ANY, tags=expected_tags) in mock_metrics.mock_calls +@pytest.mark.skipif(not TIKTOKEN_AVAILABLE, reason="This test computes token counts using tiktoken") +@pytest.mark.snapshot(token="tests.contrib.openai.test_openai.test_completion_stream") +def test_completion_stream(openai, openai_vcr, mock_metrics, snapshot_tracer): + with openai_vcr.use_cassette("completion_streamed.yaml"): + with mock.patch("ddtrace.contrib.internal.openai.utils.encoding_for_model", create=True) as mock_encoding: + mock_encoding.return_value.encode.side_effect = lambda x: [1, 2] + client = openai.OpenAI() + resp = client.completions.create(model="ada", prompt="Hello world", stream=True, n=None) + _ = [c for c in resp] -async def test_completion_async_stream(openai, openai_vcr, mock_metrics, mock_tracer): +@pytest.mark.skipif(not TIKTOKEN_AVAILABLE, reason="This test computes token counts using tiktoken") +@pytest.mark.snapshot(token="tests.contrib.openai.test_openai.test_completion_stream") +async def test_completion_async_stream(openai, openai_vcr, mock_metrics, snapshot_tracer): with openai_vcr.use_cassette("completion_streamed.yaml"): with mock.patch("ddtrace.contrib.internal.openai.utils.encoding_for_model", create=True) as mock_encoding: mock_encoding.return_value.encode.side_effect = lambda x: [1, 2] - expected_completion = '! ... A page layouts page drawer? ... Interesting. The "Tools" is' client = openai.AsyncOpenAI() - resp = await client.completions.create(model="ada", prompt="Hello world", stream=True) - chunks = [c async for c in resp] - - completion = "".join([c.choices[0].text for c in chunks]) - assert completion == expected_completion - - traces = mock_tracer.pop_traces() - assert len(traces) == 1 - assert len(traces[0]) == 1 - assert traces[0][0].get_tag("openai.response.choices.0.text") == expected_completion - assert traces[0][0].get_tag("openai.response.choices.0.finish_reason") == "length" - - expected_tags = [ - "version:", - "env:", - "service:tests.contrib.openai", - "openai.request.model:ada", - "model:ada", - "openai.request.endpoint:/v1/completions", - "openai.request.method:POST", - "openai.organization.id:", - "openai.organization.name:datadog-4", - "openai.user.api_key:sk-...key>", - "error:0", - "openai.estimated:true", - ] - if TIKTOKEN_AVAILABLE: - expected_tags = expected_tags[:-1] - assert mock.call.distribution("tokens.prompt", 2, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.distribution("tokens.completion", mock.ANY, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.distribution("tokens.total", mock.ANY, tags=expected_tags) in mock_metrics.mock_calls + resp = await client.completions.create(model="ada", prompt="Hello world", stream=True, n=None) + _ = [c async for c in resp] @pytest.mark.skipif( - parse_version(openai_module.version.VERSION) < (1, 6, 0), + parse_version(openai_module.version.VERSION) < (1, 6, 0) or not TIKTOKEN_AVAILABLE, reason="Streamed response context managers are only available v1.6.0+", ) -def test_completion_stream_context_manager(openai, openai_vcr, mock_metrics, mock_tracer): +@pytest.mark.snapshot(token="tests.contrib.openai.test_openai.test_completion_stream") +def test_completion_stream_context_manager(openai, openai_vcr, mock_metrics, snapshot_tracer): with openai_vcr.use_cassette("completion_streamed.yaml"): with mock.patch("ddtrace.contrib.internal.openai.utils.encoding_for_model", create=True) as mock_encoding: mock_encoding.return_value.encode.side_effect = lambda x: [1, 2] - expected_completion = '! ... A page layouts page drawer? ... Interesting. The "Tools" is' client = openai.OpenAI() with client.completions.create(model="ada", prompt="Hello world", stream=True, n=None) as resp: - chunks = [c for c in resp] + _ = [c for c in resp] - completion = "".join([c.choices[0].text for c in chunks]) - assert completion == expected_completion - - traces = mock_tracer.pop_traces() - assert len(traces) == 1 - assert len(traces[0]) == 1 - assert traces[0][0].get_tag("openai.response.choices.0.text") == expected_completion - assert traces[0][0].get_tag("openai.response.choices.0.finish_reason") == "length" - expected_tags = [ - "version:", - "env:", - "service:tests.contrib.openai", - "openai.request.model:ada", - "model:ada", - "openai.request.endpoint:/v1/completions", - "openai.request.method:POST", - "openai.organization.id:", - "openai.organization.name:datadog-4", - "openai.user.api_key:sk-...key>", - "error:0", - "openai.estimated:true", - ] - if TIKTOKEN_AVAILABLE: - expected_tags = expected_tags[:-1] - assert mock.call.distribution("tokens.prompt", 2, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.distribution("tokens.completion", mock.ANY, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.distribution("tokens.total", mock.ANY, tags=expected_tags) in mock_metrics.mock_calls +@pytest.mark.skipif( + parse_version(openai_module.version.VERSION) < (1, 26), reason="Stream options only available openai >= 1.26" +) +@pytest.mark.snapshot(token="tests.contrib.openai.test_openai.test_chat_completion_stream") +def test_chat_completion_stream(openai, openai_vcr, mock_metrics, snapshot_tracer): + """Assert that streamed token chunk extraction logic works automatically.""" + with openai_vcr.use_cassette("chat_completion_streamed_tokens.yaml"): + with mock.patch("ddtrace.contrib.internal.openai.utils.encoding_for_model", create=True) as mock_encoding: + mock_encoding.return_value.encode.side_effect = lambda x: [1, 2, 3, 4, 5, 6, 7, 8] + client = openai.OpenAI() + resp = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Who won the world series in 2020?"}], + stream=True, + user="ddtrace-test", + n=None, + ) + _ = [c for c in resp] -def test_chat_completion_stream(openai, openai_vcr, mock_metrics, snapshot_tracer): +@pytest.mark.skipif( + parse_version(openai_module.version.VERSION) < (1, 26), reason="Stream options only available openai >= 1.26" +) +def test_chat_completion_stream_explicit_no_tokens(openai, openai_vcr, mock_metrics, snapshot_tracer): + """Assert that streamed token chunk extraction logic is avoided if explicitly set to False by the user.""" with openai_vcr.use_cassette("chat_completion_streamed.yaml"): with mock.patch("ddtrace.contrib.internal.openai.utils.encoding_for_model", create=True) as mock_encoding: mock_encoding.return_value.encode.side_effect = lambda x: [1, 2, 3, 4, 5, 6, 7, 8] @@ -1054,20 +1004,16 @@ def test_chat_completion_stream(openai, openai_vcr, mock_metrics, snapshot_trace {"role": "user", "content": "Who won the world series in 2020?"}, ], stream=True, + stream_options={"include_usage": False}, user="ddtrace-test", n=None, ) - prompt_tokens = 8 span = snapshot_tracer.current_span() chunks = [c for c in resp] assert len(chunks) == 15 completion = "".join([c.choices[0].delta.content for c in chunks if c.choices[0].delta.content is not None]) assert completion == expected_completion - assert span.get_tag("openai.response.choices.0.message.content") == expected_completion - assert span.get_tag("openai.response.choices.0.message.role") == "assistant" - assert span.get_tag("openai.response.choices.0.finish_reason") == "stop" - expected_tags = [ "version:", "env:", @@ -1087,16 +1033,19 @@ def test_chat_completion_stream(openai, openai_vcr, mock_metrics, snapshot_trace expected_tags += ["openai.estimated:true"] if TIKTOKEN_AVAILABLE: expected_tags = expected_tags[:-1] - assert mock.call.distribution("tokens.prompt", prompt_tokens, tags=expected_tags) in mock_metrics.mock_calls + assert mock.call.distribution("tokens.prompt", 8, tags=expected_tags) in mock_metrics.mock_calls assert mock.call.distribution("tokens.completion", mock.ANY, tags=expected_tags) in mock_metrics.mock_calls assert mock.call.distribution("tokens.total", mock.ANY, tags=expected_tags) in mock_metrics.mock_calls +@pytest.mark.skipif( + parse_version(openai_module.version.VERSION) < (1, 26, 0), reason="Streamed tokens available in 1.26.0+" +) +@pytest.mark.snapshot(token="tests.contrib.openai.test_openai.test_chat_completion_stream") async def test_chat_completion_async_stream(openai, openai_vcr, mock_metrics, snapshot_tracer): - with openai_vcr.use_cassette("chat_completion_streamed.yaml"): + with openai_vcr.use_cassette("chat_completion_streamed_tokens.yaml"): with mock.patch("ddtrace.contrib.internal.openai.utils.encoding_for_model", create=True) as mock_encoding: mock_encoding.return_value.encode.side_effect = lambda x: [1, 2, 3, 4, 5, 6, 7, 8] - expected_completion = "The Los Angeles Dodgers won the World Series in 2020." client = openai.AsyncOpenAI() resp = await client.chat.completions.create( model="gpt-3.5-turbo", @@ -1104,99 +1053,21 @@ async def test_chat_completion_async_stream(openai, openai_vcr, mock_metrics, sn {"role": "user", "content": "Who won the world series in 2020?"}, ], stream=True, + n=None, user="ddtrace-test", ) - prompt_tokens = 8 - span = snapshot_tracer.current_span() - chunks = [c async for c in resp] - assert len(chunks) == 15 - completion = "".join([c.choices[0].delta.content for c in chunks if c.choices[0].delta.content is not None]) - assert completion == expected_completion - - assert span.get_tag("openai.response.choices.0.message.content") == expected_completion - assert span.get_tag("openai.response.choices.0.message.role") == "assistant" - assert span.get_tag("openai.response.choices.0.finish_reason") == "stop" - - expected_tags = [ - "version:", - "env:", - "service:tests.contrib.openai", - "openai.request.model:gpt-3.5-turbo", - "model:gpt-3.5-turbo", - "openai.request.endpoint:/v1/chat/completions", - "openai.request.method:POST", - "openai.organization.id:", - "openai.organization.name:datadog-4", - "openai.user.api_key:sk-...key>", - "error:0", - ] - assert mock.call.distribution("request.duration", span.duration_ns, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.gauge("ratelimit.requests", 3000, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.gauge("ratelimit.remaining.requests", 2999, tags=expected_tags) in mock_metrics.mock_calls - expected_tags += ["openai.estimated:true"] - if TIKTOKEN_AVAILABLE: - expected_tags = expected_tags[:-1] - assert mock.call.distribution("tokens.prompt", prompt_tokens, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.distribution("tokens.completion", mock.ANY, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.distribution("tokens.total", mock.ANY, tags=expected_tags) in mock_metrics.mock_calls - - -@pytest.mark.skipif( - parse_version(openai_module.version.VERSION) < (1, 26, 0), reason="Streamed tokens available in 1.26.0+" -) -def test_chat_completion_stream_tokens(openai, openai_vcr, mock_metrics, snapshot_tracer): - with openai_vcr.use_cassette("chat_completion_streamed_tokens.yaml"): - expected_completion = "The Los Angeles Dodgers won the World Series in 2020." - client = openai.OpenAI() - resp = client.chat.completions.create( - model="gpt-3.5-turbo", - messages=[{"role": "user", "content": "Who won the world series in 2020?"}], - stream=True, - user="ddtrace-test", - n=None, - stream_options={"include_usage": True}, - ) - span = snapshot_tracer.current_span() - chunks = [c for c in resp] - completion = "".join( - [c.choices[0].delta.content for c in chunks if c.choices and c.choices[0].delta.content is not None] - ) - assert completion == expected_completion - - assert span.get_tag("openai.response.choices.0.message.content") == expected_completion - assert span.get_tag("openai.response.choices.0.message.role") == "assistant" - assert span.get_tag("openai.response.choices.0.finish_reason") == "stop" - - expected_tags = [ - "version:", - "env:", - "service:tests.contrib.openai", - "openai.request.model:gpt-3.5-turbo", - "model:gpt-3.5-turbo", - "openai.request.endpoint:/v1/chat/completions", - "openai.request.method:POST", - "openai.organization.id:", - "openai.organization.name:datadog-4", - "openai.user.api_key:sk-...key>", - "error:0", - ] - assert mock.call.distribution("request.duration", span.duration_ns, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.gauge("ratelimit.requests", 3000, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.gauge("ratelimit.remaining.requests", 2999, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.distribution("tokens.prompt", 17, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.distribution("tokens.completion", 19, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.distribution("tokens.total", 36, tags=expected_tags) in mock_metrics.mock_calls + _ = [c async for c in resp] @pytest.mark.skipif( - parse_version(openai_module.version.VERSION) < (1, 6, 0), - reason="Streamed response context managers are only available v1.6.0+", + parse_version(openai_module.version.VERSION) < (1, 26, 0), + reason="Streamed response context managers are only available v1.6.0+, tokens available 1.26.0+", ) +@pytest.mark.snapshot(token="tests.contrib.openai.test_openai.test_chat_completion_stream") async def test_chat_completion_async_stream_context_manager(openai, openai_vcr, mock_metrics, snapshot_tracer): - with openai_vcr.use_cassette("chat_completion_streamed.yaml"): + with openai_vcr.use_cassette("chat_completion_streamed_tokens.yaml"): with mock.patch("ddtrace.contrib.internal.openai.utils.encoding_for_model", create=True) as mock_encoding: mock_encoding.return_value.encode.side_effect = lambda x: [1, 2, 3, 4, 5, 6, 7, 8] - expected_completion = "The Los Angeles Dodgers won the World Series in 2020." client = openai.AsyncOpenAI() async with await client.chat.completions.create( model="gpt-3.5-turbo", @@ -1207,41 +1078,7 @@ async def test_chat_completion_async_stream_context_manager(openai, openai_vcr, user="ddtrace-test", n=None, ) as resp: - prompt_tokens = 8 - span = snapshot_tracer.current_span() - chunks = [c async for c in resp] - assert len(chunks) == 15 - completion = "".join( - [c.choices[0].delta.content for c in chunks if c.choices[0].delta.content is not None] - ) - assert completion == expected_completion - - assert span.get_tag("openai.response.choices.0.message.content") == expected_completion - assert span.get_tag("openai.response.choices.0.message.role") == "assistant" - assert span.get_tag("openai.response.choices.0.finish_reason") == "stop" - - expected_tags = [ - "version:", - "env:", - "service:tests.contrib.openai", - "openai.request.model:gpt-3.5-turbo", - "model:gpt-3.5-turbo", - "openai.request.endpoint:/v1/chat/completions", - "openai.request.method:POST", - "openai.organization.id:", - "openai.organization.name:datadog-4", - "openai.user.api_key:sk-...key>", - "error:0", - ] - assert mock.call.distribution("request.duration", span.duration_ns, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.gauge("ratelimit.requests", 3000, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.gauge("ratelimit.remaining.requests", 2999, tags=expected_tags) in mock_metrics.mock_calls - expected_tags += ["openai.estimated:true"] - if TIKTOKEN_AVAILABLE: - expected_tags = expected_tags[:-1] - assert mock.call.distribution("tokens.prompt", prompt_tokens, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.distribution("tokens.completion", mock.ANY, tags=expected_tags) in mock_metrics.mock_calls - assert mock.call.distribution("tokens.total", mock.ANY, tags=expected_tags) in mock_metrics.mock_calls + _ = [c async for c in resp] @pytest.mark.snapshot( diff --git a/tests/snapshots/tests.contrib.openai.test_openai.test_chat_completion_stream.json b/tests/snapshots/tests.contrib.openai.test_openai.test_chat_completion_stream.json new file mode 100644 index 00000000000..fe7c9e3b0f2 --- /dev/null +++ b/tests/snapshots/tests.contrib.openai.test_openai.test_chat_completion_stream.json @@ -0,0 +1,53 @@ +[[ + { + "name": "openai.request", + "service": "tests.contrib.openai", + "resource": "createChatCompletion", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "", + "error": 0, + "meta": { + "_dd.p.dm": "-0", + "_dd.p.tid": "67741fca00000000", + "component": "openai", + "language": "python", + "openai.base_url": "https://api.openai.com/v1/", + "openai.organization.name": "datadog-4", + "openai.request.client": "OpenAI", + "openai.request.endpoint": "/v1/chat/completions", + "openai.request.messages.0.content": "Who won the world series in 2020?", + "openai.request.messages.0.name": "", + "openai.request.messages.0.role": "user", + "openai.request.method": "POST", + "openai.request.model": "gpt-3.5-turbo", + "openai.request.n": "None", + "openai.request.stream": "True", + "openai.request.user": "ddtrace-test", + "openai.response.choices.0.finish_reason": "stop", + "openai.response.choices.0.message.content": "The Los Angeles Dodgers won the World Series in 2020.", + "openai.response.choices.0.message.role": "assistant", + "openai.response.model": "gpt-3.5-turbo-0301", + "openai.user.api_key": "sk-...key>", + "runtime-id": "d174f65e33314f43ad1de8cf0a5ca4e0" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "openai.organization.ratelimit.requests.limit": 3000, + "openai.organization.ratelimit.requests.remaining": 2999, + "openai.organization.ratelimit.tokens.limit": 250000, + "openai.organization.ratelimit.tokens.remaining": 249979, + "openai.request.prompt_tokens_estimated": 0, + "openai.response.completion_tokens_estimated": 0, + "openai.response.usage.completion_tokens": 19, + "openai.response.usage.prompt_tokens": 17, + "openai.response.usage.total_tokens": 36, + "process_id": 22982 + }, + "duration": 29869000, + "start": 1735663562179157000 + }]] diff --git a/tests/snapshots/tests.contrib.openai.test_openai.test_completion_stream.json b/tests/snapshots/tests.contrib.openai.test_openai.test_completion_stream.json new file mode 100644 index 00000000000..7cf644cfb3d --- /dev/null +++ b/tests/snapshots/tests.contrib.openai.test_openai.test_completion_stream.json @@ -0,0 +1,49 @@ +[[ + { + "name": "openai.request", + "service": "tests.contrib.openai", + "resource": "createCompletion", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "", + "error": 0, + "meta": { + "_dd.p.dm": "-0", + "_dd.p.tid": "6774231f00000000", + "component": "openai", + "language": "python", + "openai.base_url": "https://api.openai.com/v1/", + "openai.organization.name": "datadog-4", + "openai.request.client": "OpenAI", + "openai.request.endpoint": "/v1/completions", + "openai.request.method": "POST", + "openai.request.model": "ada", + "openai.request.n": "None", + "openai.request.prompt.0": "Hello world", + "openai.request.stream": "True", + "openai.response.choices.0.finish_reason": "length", + "openai.response.choices.0.text": "! ... A page layouts page drawer? ... Interesting. The \"Tools\" is", + "openai.response.model": "ada", + "openai.user.api_key": "sk-...key>", + "runtime-id": "11872c9ca653441db861b108a4f795eb" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "openai.organization.ratelimit.requests.limit": 3000, + "openai.organization.ratelimit.requests.remaining": 2999, + "openai.organization.ratelimit.tokens.limit": 250000, + "openai.organization.ratelimit.tokens.remaining": 249979, + "openai.request.prompt_tokens_estimated": 0, + "openai.response.completion_tokens_estimated": 0, + "openai.response.usage.completion_tokens": 2, + "openai.response.usage.prompt_tokens": 2, + "openai.response.usage.total_tokens": 4, + "process_id": 27488 + }, + "duration": 28739000, + "start": 1735664415266386000 + }]] diff --git a/tests/snapshots/tests.contrib.openai.test_openai_v1.test_completion_stream_est_tokens.json b/tests/snapshots/tests.contrib.openai.test_openai_v1.test_completion_stream_est_tokens.json new file mode 100644 index 00000000000..445dc39db98 --- /dev/null +++ b/tests/snapshots/tests.contrib.openai.test_openai_v1.test_completion_stream_est_tokens.json @@ -0,0 +1,49 @@ +[[ + { + "name": "openai.request", + "service": "tests.contrib.openai", + "resource": "createCompletion", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "", + "error": 0, + "meta": { + "_dd.p.dm": "-0", + "_dd.p.tid": "677c221c00000000", + "component": "openai", + "language": "python", + "openai.base_url": "https://api.openai.com/v1/", + "openai.organization.name": "datadog-4", + "openai.request.client": "OpenAI", + "openai.request.endpoint": "/v1/completions", + "openai.request.method": "POST", + "openai.request.model": "ada", + "openai.request.n": "None", + "openai.request.prompt.0": "Hello world", + "openai.request.stream": "True", + "openai.response.choices.0.finish_reason": "length", + "openai.response.choices.0.text": "! ... A page layouts page drawer? ... Interesting. The \"Tools\" is", + "openai.response.model": "ada", + "openai.user.api_key": "sk-...key>", + "runtime-id": "24f8e851c87e4f758c73d6acd0aaf82b" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "openai.organization.ratelimit.requests.limit": 3000, + "openai.organization.ratelimit.requests.remaining": 2999, + "openai.organization.ratelimit.tokens.limit": 250000, + "openai.organization.ratelimit.tokens.remaining": 249979, + "openai.request.prompt_tokens_estimated": 1, + "openai.response.completion_tokens_estimated": 1, + "openai.response.usage.completion_tokens": 16, + "openai.response.usage.prompt_tokens": 2, + "openai.response.usage.total_tokens": 18, + "process_id": 47101 + }, + "duration": 37957000, + "start": 1736188444222291000 + }]]