Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support deepgram audio queries and fix streaming in openai structured outputs #1465

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions screenpipe-js/ai-proxy/src/handlers/chat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { Env, RequestBody } from '../types';
import { createProvider } from '../providers';
import { Langfuse } from 'langfuse-node';
import { addCorsHeaders } from '../utils/cors';

/**
* Handles chat completion requests
* @param body Request body containing chat messages and parameters
* @param env Environment variables
* @param langfuse Analytics client
* @returns Response containing AI completion
*/
export async function handleChatCompletions(body: RequestBody, env: Env, langfuse: Langfuse): Promise<Response> {
const provider = createProvider(body.model, env);
const trace = langfuse.trace({
id: 'ai_call_' + Date.now(),
name: 'ai_call',
metadata: {
model: body.model,
streaming: body.stream === true,
},
});

try {
let response: Response;

if (body.stream) {
const stream = await provider.createStreamingCompletion(body);
response = new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
} else {
response = await provider.createCompletion(body);
}

trace.update({
metadata: {
completionStatus: 'success',
completionTime: new Date().toISOString(),
modelUsed: body.model,
isStreaming: body.stream === true,
},
output: response.statusText,
});

// add CORS headers
return addCorsHeaders(response);
} catch (error: any) {
trace.update({
metadata: {
completionStatus: 'error',
errorTime: new Date().toISOString(),
errorType: error.name,
errorMessage: error.message,
},
output: error.message,
});
throw error;
}
}
39 changes: 39 additions & 0 deletions screenpipe-js/ai-proxy/src/handlers/models.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { Env } from '../types';
import { createProvider } from '../providers';
import { createSuccessResponse, createErrorResponse } from '../utils/cors';

/**
* Handles model listing requests
* @param env Environment variables
* @returns Response with list of available AI models
*/
export async function handleModelListing(env: Env): Promise<Response> {
try {
const providers = {
anthropic: createProvider('claude-3-5-sonnet-latest', env),
openai: createProvider('gpt-4', env),
gemini: createProvider('gemini-1.5-pro', env),
};

const results = await Promise.allSettled([
providers.anthropic.listModels(),
providers.openai.listModels(),
providers.gemini.listModels(),
]);

const models = results
.filter(
(result): result is PromiseFulfilledResult<{ id: string; name: string; provider: string }[]> =>
result.status === 'fulfilled'
)
.flatMap((result) => result.value);

return createSuccessResponse({ models });
} catch (error) {
console.error('Error fetching models:', error);
return createErrorResponse(
500,
`Failed to fetch models: ${error instanceof Error ? error.message : 'Unknown error'}`
);
}
}
110 changes: 110 additions & 0 deletions screenpipe-js/ai-proxy/src/handlers/transcription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { Env } from '../types';
import { createClient, LiveTranscriptionEvents } from '@deepgram/sdk';
import { createSuccessResponse, createErrorResponse } from '../utils/cors';

/**
* Handles audio file transcription requests
* @param request The HTTP request containing audio data
* @param env Environment variables
* @returns Response with transcription results
*/
export async function handleFileTranscription(request: Request, env: Env): Promise<Response> {
try {
const audioBuffer = await request.arrayBuffer();
const languages = request.headers.get('detect_language')?.split(',') || [];
const sampleRate = request.headers.get('sample_rate') || '16000';

const deepgramResponse = await fetch(
'https://api.deepgram.com/v1/listen?model=nova-2&smart_format=true&sample_rate=' +
sampleRate +
(languages.length > 0 ? '&' + languages.map((lang) => `detect_language=${lang}`).join('&') : ''),
{
method: 'POST',
headers: {
Authorization: `Token ${env.DEEPGRAM_API_KEY}`,
'Content-Type': 'audio/wav',
},
body: audioBuffer,
}
);

if (!deepgramResponse.ok) {
const errorData = await deepgramResponse.json();
throw new Error(`Deepgram API error: ${JSON.stringify(errorData)}`);
}

const data: string | object = await deepgramResponse.json();
return createSuccessResponse(data);
} catch (error: any) {
console.error('Error in Deepgram request:', error);
return createErrorResponse(500, error.message);
}
}

/**
* Handles WebSocket upgrade for real-time transcription
* @param request The HTTP request for WebSocket upgrade
* @param env Environment variables
* @returns Response with WebSocket connection
*/
export async function handleWebSocketUpgrade(request: Request, env: Env): Promise<Response> {
try {
const requestId = crypto.randomUUID();

const webSocketPair = new WebSocketPair();
const [client, server] = Object.values(webSocketPair);
server.accept();

let params = new URL(request.url).searchParams;
let url = new URL('wss://api.deepgram.com/v1/listen');

for (let [key, value] of params.entries()) {
url.searchParams.set(key, value);
}

let deepgram = createClient(env.DEEPGRAM_API_KEY);
let deepgramSocket = deepgram.listen.live({}, url.toString());

deepgramSocket.on(LiveTranscriptionEvents.Open, () => {
server.send(
JSON.stringify({
type: 'connected',
message: 'WebSocket connection established',
})
);
});

server.addEventListener('message', (event) => {
if (deepgramSocket.getReadyState() === WebSocket.OPEN) {
deepgramSocket.send(event.data);
}
});

deepgramSocket.on(LiveTranscriptionEvents.Transcript, (data) => {
if (server.readyState === WebSocket.OPEN) {
server.send(JSON.stringify(data));
}
});

server.addEventListener('close', () => {
deepgramSocket.requestClose();
});

deepgramSocket.on(LiveTranscriptionEvents.Error, (error) => {
if (server.readyState === WebSocket.OPEN) {
server.close(1011, 'Deepgram error: ' + error.message);
}
});

return new Response(null, {
status: 101,
webSocket: client,
headers: {
'dg-request-id': requestId,
},
});
} catch (error) {
console.error('WebSocket upgrade failed:', error);
return createErrorResponse(500, 'WebSocket upgrade failed');
}
}
1 change: 1 addition & 0 deletions screenpipe-js/ai-proxy/src/handlers/voice-ws.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
// TODO: implement
Loading
Loading