Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(vertex_ai/gemini): improve chunk parsing for streaming responses #8401

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1431,7 +1431,8 @@ def handle_accumulated_json_chunk(self, chunk: str) -> GenericStreamingChunk:

def _common_chunk_parsing_logic(self, chunk: str) -> GenericStreamingChunk:
try:
chunk = chunk.replace("data:", "")
if chunk.startswith("data: "):
chunk = chunk[len("data: "):]
if len(chunk) > 0:
"""
Check if initial chunk valid json
Expand Down
52 changes: 49 additions & 3 deletions tests/local_testing/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,50 @@ async def test_completion_gemini_stream(sync_mode):
# return
pytest.fail(f"Error occurred: {e}")

def test_vertex_and_google_studio_gemini_data_prefix_stripping():
"""
Test that the _common_chunk_parsing_logic correctly handles 'data:' prefixes in SSE streams, particularly ensuring it doesn't remove 'data:' from the actual content.
"""
print("\n=== Testing Vertex AI and Gemini data prefix stripping ===")
from litellm.llms.vertex_ai.gemini.vertex_and_google_ai_studio_gemini import (
ModelResponseIterator,
)

# Test case 1: SSE message with 'data:' prefix containing 'data:' in content
iterator = ModelResponseIterator(None, sync_stream=True)
chunk = 'data: {"candidates": [{"content": {"role": "model","parts": [{"text": "data: Good morning! "}]}}]}'
print("\nTest case 1 - SSE message with 'data:' prefix in content")
print(f"Input chunk: {chunk}")
result = iterator._common_chunk_parsing_logic(chunk)
print(f"Parsed result: {result}")
assert result["text"] == "data: Good morning! ", "Should preserve 'data:' in content"

# Test case 2: Verify surrounding spaces are preserved in message content
chunk = 'data: {"candidates": [{"content": {"role": "model","parts": [{"text": " Hello! "}]}}]}'
print("\nTest case 2 - Message with surrounding space")
print(f"Input chunk: {chunk}")
result = iterator._common_chunk_parsing_logic(chunk)
print(f"Parsed result: {result}")
assert result["text"] == " Hello! ", "Should preserve surrounding spaces in message content"

# Test case 3: Multiple 'data:' in content
chunk = 'data: {"candidates": [{"content": {"role": "model","parts": [{"text": "data: First line\\ndata: Second line"}]}}]}'
print("\nTest case 4 - Multiple 'data:' in content")
print(f"Input chunk: {chunk}")
result = iterator._common_chunk_parsing_logic(chunk)
print(f"Parsed result: {result}")
assert result["text"] == "data: First line\ndata: Second line", "Should preserve all 'data:' in content"

# Test case 4: Complete SSE response payload from a real stream

chunk = '''data: {"candidates": [{"content": {"role": "model","parts": [{"text": "data: line 1\\nsomedata: line 2!\\ndata: line 3"}]},"safetyRatings": [{"category": "HARM_CATEGORY_HATE_SPEECH","probability": "NEGLIGIBLE","probabilityScore": 0.14746094,"severity": "HARM_SEVERITY_NEGLIGIBLE","severityScore": 0.11279297},{"category": "HARM_CATEGORY_DANGEROUS_CONTENT","probability": "NEGLIGIBLE","probabilityScore": 0.103515625,"severity": "HARM_SEVERITY_NEGLIGIBLE","severityScore": 0.07470703},{"category": "HARM_CATEGORY_HARASSMENT","probability": "NEGLIGIBLE","probabilityScore": 0.21972656,"severity": "HARM_SEVERITY_NEGLIGIBLE","severityScore": 0.09033203},{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT","probability": "NEGLIGIBLE","probabilityScore": 0.12890625,"severity": "HARM_SEVERITY_NEGLIGIBLE","severityScore": 0.09814453}]}],"modelVersion": "gemini-1.5-flash-001","createTime": "2025-01-01T00:00:00.000000Z","responseId": "response-id-xxxxx"}'''
print("\nTest case 4 - Complete SSE response payload")
print(f"Input chunk: {chunk}")
result = iterator._common_chunk_parsing_logic(chunk)
print(f"Parsed result: {result}")
assert result["text"] == "data: line 1\nsomedata: line 2!\ndata: line 3", "Should preserve all data prefixes in the content"

print("\n=== All test cases passed successfully ===")
# asyncio.run(test_acompletion_gemini_stream())
def gemini_mock_post_streaming(url, **kwargs):
# This generator simulates the streaming response with partial JSON content
Expand Down Expand Up @@ -2954,6 +2997,7 @@ def test_azure_streaming_and_function_calling():
async def test_completion_azure_ai_mistral_invalid_params(sync_mode):
try:
import os

from litellm import stream_chunk_builder

litellm.set_verbose = True
Expand Down Expand Up @@ -4006,12 +4050,12 @@ def test_mock_response_iterator_tool_use():
from litellm.llms.bedrock.chat.invoke_handler import MockResponseIterator
from litellm.types.utils import (
ChatCompletionMessageToolCall,
Choices,
CompletionTokensDetailsWrapper,
Function,
Message,
Usage,
CompletionTokensDetailsWrapper,
PromptTokensDetailsWrapper,
Choices,
Usage,
)

litellm.set_verbose = False
Expand Down Expand Up @@ -4084,3 +4128,5 @@ def test_deepseek_reasoning_content_completion():
assert reasoning_content_exists
except litellm.Timeout:
pytest.skip("Model is timing out")