diff --git a/screenpipe-js/ai-proxy/src/handlers/chat.ts b/screenpipe-js/ai-proxy/src/handlers/chat.ts new file mode 100644 index 0000000000..6a64afec30 --- /dev/null +++ b/screenpipe-js/ai-proxy/src/handlers/chat.ts @@ -0,0 +1,64 @@ +import { Env, RequestBody } from '../types'; +import { createProvider } from '../providers'; +import { Langfuse } from 'langfuse-node'; +import { addCorsHeaders } from '../utils/cors'; + +/** + * Handles chat completion requests + * @param body Request body containing chat messages and parameters + * @param env Environment variables + * @param langfuse Analytics client + * @returns Response containing AI completion + */ +export async function handleChatCompletions(body: RequestBody, env: Env, langfuse: Langfuse): Promise { + const provider = createProvider(body.model, env); + const trace = langfuse.trace({ + id: 'ai_call_' + Date.now(), + name: 'ai_call', + metadata: { + model: body.model, + streaming: body.stream === true, + }, + }); + + try { + let response: Response; + + if (body.stream) { + const stream = await provider.createStreamingCompletion(body); + response = new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, + }); + } else { + response = await provider.createCompletion(body); + } + + trace.update({ + metadata: { + completionStatus: 'success', + completionTime: new Date().toISOString(), + modelUsed: body.model, + isStreaming: body.stream === true, + }, + output: response.statusText, + }); + + // add CORS headers + return addCorsHeaders(response); + } catch (error: any) { + trace.update({ + metadata: { + completionStatus: 'error', + errorTime: new Date().toISOString(), + errorType: error.name, + errorMessage: error.message, + }, + output: error.message, + }); + throw error; + } +} \ No newline at end of file diff --git a/screenpipe-js/ai-proxy/src/handlers/models.ts b/screenpipe-js/ai-proxy/src/handlers/models.ts new file mode 100644 index 0000000000..255ceb81cb --- /dev/null +++ b/screenpipe-js/ai-proxy/src/handlers/models.ts @@ -0,0 +1,39 @@ +import { Env } from '../types'; +import { createProvider } from '../providers'; +import { createSuccessResponse, createErrorResponse } from '../utils/cors'; + +/** + * Handles model listing requests + * @param env Environment variables + * @returns Response with list of available AI models + */ +export async function handleModelListing(env: Env): Promise { + try { + const providers = { + anthropic: createProvider('claude-3-5-sonnet-latest', env), + openai: createProvider('gpt-4', env), + gemini: createProvider('gemini-1.5-pro', env), + }; + + const results = await Promise.allSettled([ + providers.anthropic.listModels(), + providers.openai.listModels(), + providers.gemini.listModels(), + ]); + + const models = results + .filter( + (result): result is PromiseFulfilledResult<{ id: string; name: string; provider: string }[]> => + result.status === 'fulfilled' + ) + .flatMap((result) => result.value); + + return createSuccessResponse({ models }); + } catch (error) { + console.error('Error fetching models:', error); + return createErrorResponse( + 500, + `Failed to fetch models: ${error instanceof Error ? error.message : 'Unknown error'}` + ); + } +} \ No newline at end of file diff --git a/screenpipe-js/ai-proxy/src/handlers/transcription.ts b/screenpipe-js/ai-proxy/src/handlers/transcription.ts new file mode 100644 index 0000000000..e79e52d2a8 --- /dev/null +++ b/screenpipe-js/ai-proxy/src/handlers/transcription.ts @@ -0,0 +1,110 @@ +import { Env } from '../types'; +import { createClient, LiveTranscriptionEvents } from '@deepgram/sdk'; +import { createSuccessResponse, createErrorResponse } from '../utils/cors'; + +/** + * Handles audio file transcription requests + * @param request The HTTP request containing audio data + * @param env Environment variables + * @returns Response with transcription results + */ +export async function handleFileTranscription(request: Request, env: Env): Promise { + try { + const audioBuffer = await request.arrayBuffer(); + const languages = request.headers.get('detect_language')?.split(',') || []; + const sampleRate = request.headers.get('sample_rate') || '16000'; + + const deepgramResponse = await fetch( + 'https://api.deepgram.com/v1/listen?model=nova-2&smart_format=true&sample_rate=' + + sampleRate + + (languages.length > 0 ? '&' + languages.map((lang) => `detect_language=${lang}`).join('&') : ''), + { + method: 'POST', + headers: { + Authorization: `Token ${env.DEEPGRAM_API_KEY}`, + 'Content-Type': 'audio/wav', + }, + body: audioBuffer, + } + ); + + if (!deepgramResponse.ok) { + const errorData = await deepgramResponse.json(); + throw new Error(`Deepgram API error: ${JSON.stringify(errorData)}`); + } + + const data: string | object = await deepgramResponse.json(); + return createSuccessResponse(data); + } catch (error: any) { + console.error('Error in Deepgram request:', error); + return createErrorResponse(500, error.message); + } +} + +/** + * Handles WebSocket upgrade for real-time transcription + * @param request The HTTP request for WebSocket upgrade + * @param env Environment variables + * @returns Response with WebSocket connection + */ +export async function handleWebSocketUpgrade(request: Request, env: Env): Promise { + try { + const requestId = crypto.randomUUID(); + + const webSocketPair = new WebSocketPair(); + const [client, server] = Object.values(webSocketPair); + server.accept(); + + let params = new URL(request.url).searchParams; + let url = new URL('wss://api.deepgram.com/v1/listen'); + + for (let [key, value] of params.entries()) { + url.searchParams.set(key, value); + } + + let deepgram = createClient(env.DEEPGRAM_API_KEY); + let deepgramSocket = deepgram.listen.live({}, url.toString()); + + deepgramSocket.on(LiveTranscriptionEvents.Open, () => { + server.send( + JSON.stringify({ + type: 'connected', + message: 'WebSocket connection established', + }) + ); + }); + + server.addEventListener('message', (event) => { + if (deepgramSocket.getReadyState() === WebSocket.OPEN) { + deepgramSocket.send(event.data); + } + }); + + deepgramSocket.on(LiveTranscriptionEvents.Transcript, (data) => { + if (server.readyState === WebSocket.OPEN) { + server.send(JSON.stringify(data)); + } + }); + + server.addEventListener('close', () => { + deepgramSocket.requestClose(); + }); + + deepgramSocket.on(LiveTranscriptionEvents.Error, (error) => { + if (server.readyState === WebSocket.OPEN) { + server.close(1011, 'Deepgram error: ' + error.message); + } + }); + + return new Response(null, { + status: 101, + webSocket: client, + headers: { + 'dg-request-id': requestId, + }, + }); + } catch (error) { + console.error('WebSocket upgrade failed:', error); + return createErrorResponse(500, 'WebSocket upgrade failed'); + } +} \ No newline at end of file diff --git a/screenpipe-js/ai-proxy/src/handlers/voice-ws.ts b/screenpipe-js/ai-proxy/src/handlers/voice-ws.ts new file mode 100644 index 0000000000..79a745897b --- /dev/null +++ b/screenpipe-js/ai-proxy/src/handlers/voice-ws.ts @@ -0,0 +1 @@ +// TODO: implement \ No newline at end of file diff --git a/screenpipe-js/ai-proxy/src/handlers/voice.ts b/screenpipe-js/ai-proxy/src/handlers/voice.ts new file mode 100644 index 0000000000..22d9d5ac9c --- /dev/null +++ b/screenpipe-js/ai-proxy/src/handlers/voice.ts @@ -0,0 +1,373 @@ +import { Env, TextToSpeechRequest, VoiceQueryResult, TranscriptionOptions, TTSOptions, TTSVoiceModelType, TranscriptionModelType } from '../types'; +import { transcribeAudio, textToSpeech, validateAudioInput } from '../utils/voice-utils'; +import { createProvider } from '../providers'; +import { Langfuse } from 'langfuse-node'; +import { createSuccessResponse, createErrorResponse, addCorsHeaders } from '../utils/cors'; + +/** + * Handles voice input transcription requests + * @param request The HTTP request containing audio data + * @param env Environment variables + * @returns Response with transcription result + */ +export async function handleVoiceTranscription(request: Request, env: Env): Promise { + const validation = await validateAudioInput(request); + if (!validation.valid || !validation.audioBuffer) { + return createErrorResponse(400, validation.error || 'Invalid audio input'); + } + + const languages = request.headers.get('detect_language')?.split(',') || ['en']; + const sampleRate = request.headers.get('sample_rate') || '16000'; + const model: TranscriptionModelType = request.headers.get('transcription_model') as TranscriptionModelType || 'nova-3'; + const diarize = request.headers.get('diarize') === 'true'; + + // Transcribe audio + const transcriptionResult = await transcribeAudio(validation.audioBuffer, env, { + languages, + sampleRate, + model, + diarize, + smartFormat: true + }); + + if (transcriptionResult.error || !transcriptionResult.text) { + return createErrorResponse( + 400, + transcriptionResult.error || 'No speech detected in the audio' + ); + } + + return createSuccessResponse({ + transcription: transcriptionResult.text, + confidence: transcriptionResult.confidence, + language: transcriptionResult.language, + words: transcriptionResult.words + }); +} + +/** + * Handles voice query requests (audio in, AI text response out) + * @param request HTTP request with audio data + * @param env Environment variables + * @param langfuse Analytics client + * @returns HTTP response with transcription and AI response + */ +export async function handleVoiceQuery(request: Request, env: Env, langfuse: Langfuse): Promise { + const trace = langfuse.trace({ + id: 'voice_query_' + Date.now(), + name: 'voice_query', + metadata: { + input_type: 'audio', + }, + }); + + try { + const validation = await validateAudioInput(request); + if (!validation.valid || !validation.audioBuffer) { + return createErrorResponse(400, validation.error || 'Invalid audio input'); + } + + const transcriptionOptions: TranscriptionOptions = { + model: (request.headers.get('transcription-model') as TranscriptionOptions['model']) || 'nova-3', + languages: request.headers.get('detect_language')?.split(',') || ['en'], + sampleRate: request.headers.get('sample_rate') || '16000', + smartFormat: true, + diarize: request.headers.get('diarize') === 'true', + punctuate: request.headers.get('punctuate') !== 'false', + }; + + const aiModel = request.headers.get('ai-model') || 'gpt-4o'; + const systemPrompt = request.headers.get('system-prompt') || 'You are a helpful assistant.'; + + const transcriptionResult = await transcribeAudio(validation.audioBuffer, env, transcriptionOptions); + + trace.update({ + metadata: { + transcription: transcriptionResult.text, + confidence: transcriptionResult.confidence, + language: transcriptionResult.language, + transcription_time: new Date().toISOString(), + }, + }); + + if (transcriptionResult.error || !transcriptionResult.text) { + return createErrorResponse(400, transcriptionResult.error || 'No speech detected in the audio'); + } + + const provider = createProvider(aiModel, env); + + const aiResponseData = await provider.createCompletion({ + model: aiModel, + messages: [ + { + role: 'system', + content: systemPrompt, + }, + { + role: 'user', + content: transcriptionResult.text, + }, + ], + stream: false, + }); + + const aiResponse = await aiResponseData.json(); + + trace.update({ + metadata: { + model_used: aiModel, + completion_time: new Date().toISOString(), + completion_status: 'success', + }, + output: aiResponse, + }); + + const result: VoiceQueryResult = { + transcription: transcriptionResult.text, + transcription_details: { + confidence: transcriptionResult.confidence, + language: transcriptionResult.language, + words: transcriptionResult.words, + }, + ai_response: aiResponse, + }; + + return createSuccessResponse(result); + } catch (error: any) { + console.error('Error in voice query:', error); + + trace.update({ + metadata: { + error: error.message, + error_time: new Date().toISOString(), + }, + }); + + return createErrorResponse(500, `Error processing voice query: ${error.message}`); + } +} + +/** + * Handles text-to-speech conversion (REST API) + * @param request HTTP request with text to convert + * @param env Environment variables + * @param langfuse Analytics client + * @returns HTTP response with audio data + */ +export async function handleTextToSpeech(request: Request, env: Env, langfuse: Langfuse): Promise { + const trace = langfuse.trace({ + id: 'text_to_speech_' + Date.now(), + name: 'text_to_speech', + metadata: { + input_type: 'text', + }, + }); + + try { + const { text, voice = 'aura-asteria-en' } = (await request.json()) as TextToSpeechRequest; + + if (!text || typeof text !== 'string') { + return createErrorResponse(400, 'Missing or invalid text parameter'); + } + + if (voice && !isValidVoiceModel(voice)) { + return createErrorResponse(400, `Invalid voice model: ${voice}. See documentation for supported models.`); + } + + trace.update({ + metadata: { + text_length: text.length, + voice: voice, + }, + }); + + const ttsOptions: TTSOptions = { + voice: voice as TTSVoiceModelType, + encoding: 'linear16', // Default to WAV format + }; + + const audioBuffer = await textToSpeech(text, env, ttsOptions); + + if (!audioBuffer) { + return createErrorResponse(500, 'Failed to convert text to speech'); + } + + trace.update({ + metadata: { + tts_status: 'success', + audio_size_bytes: audioBuffer.byteLength, + completion_time: new Date().toISOString(), + }, + }); + + const response = new Response(audioBuffer, { + headers: { + 'Content-Type': 'audio/wav', + }, + }); + + return addCorsHeaders(response); + } catch (error: any) { + console.error('Error in text-to-speech:', error); + + trace.update({ + metadata: { + tts_status: 'error', + error: error.message, + error_time: new Date().toISOString(), + }, + }); + + return createErrorResponse(500, `Error converting text to speech: ${error.message}`); + } +} + +/** + * Handles voice chat requests (audio in, audio out) + * @param request HTTP request with audio data + * @param env Environment variables + * @param langfuse Analytics client + * @returns HTTP response with audio data + */ +export async function handleVoiceChat(request: Request, env: Env, langfuse: Langfuse): Promise { + const trace = langfuse.trace({ + id: 'voice_chat_' + Date.now(), + name: 'voice_chat', + metadata: { + input_type: 'audio', + output_type: 'audio', + }, + }); + + try { + const validation = await validateAudioInput(request); + if (!validation.valid || !validation.audioBuffer) { + return createErrorResponse(400, validation.error || 'Invalid audio input'); + } + + const transcriptionOptions: TranscriptionOptions = { + model: (request.headers.get('transcription-model') as TranscriptionOptions['model']) || 'nova-3', + languages: request.headers.get('detect_language')?.split(',') || ['en'], + sampleRate: request.headers.get('sample_rate') || '16000', + smartFormat: true, + diarize: request.headers.get('diarize') === 'true', + punctuate: request.headers.get('punctuate') !== 'false', + }; + + const aiModel = request.headers.get('ai-model') || 'gpt-4o'; + const systemPrompt = request.headers.get('system-prompt') || 'You are a helpful assistant. Keep your responses concise.'; + const voice = (request.headers.get('voice') as TTSVoiceModelType) || 'aura-asteria-en'; + + if (!isValidVoiceModel(voice)) { + return createErrorResponse(400, `Invalid voice model: ${voice}. See documentation for supported models.`); + } + + const transcriptionResult = await transcribeAudio(validation.audioBuffer, env, transcriptionOptions); + + trace.update({ + metadata: { + transcription: transcriptionResult.text, + confidence: transcriptionResult.confidence, + language: transcriptionResult.language, + transcription_time: new Date().toISOString(), + }, + }); + + if (transcriptionResult.error || !transcriptionResult.text) { + return createErrorResponse(400, transcriptionResult.error || 'No speech detected in the audio'); + } + + const provider = createProvider(aiModel, env); + + const aiResponseData = await provider.createCompletion({ + model: aiModel, + messages: [ + { + role: 'system', + content: systemPrompt, + }, + { + role: 'user', + content: transcriptionResult.text, + }, + ], + stream: false, + }); + + const aiResponseJson: any = await aiResponseData.json(); + const aiResponseText = aiResponseJson.choices[0]?.message?.content || ''; + + trace.update({ + metadata: { + model_used: aiModel, + ai_completion_time: new Date().toISOString(), + ai_response_length: aiResponseText.length, + }, + }); + + const ttsOptions: TTSOptions = { + voice: voice, + encoding: 'linear16', // WAV format + }; + + const audioBuffer = await textToSpeech(aiResponseText, env, ttsOptions); + + if (!audioBuffer) { + return createErrorResponse(500, 'Failed to convert AI response to speech'); + } + + trace.update({ + metadata: { + tts_status: 'success', + audio_size_bytes: audioBuffer.byteLength, + completion_time: new Date().toISOString(), + }, + }); + + const response = new Response(audioBuffer, { + headers: { + 'Content-Type': 'audio/wav', + 'X-Transcription': transcriptionResult.text, + 'X-AI-Response': aiResponseText, + 'Access-Control-Expose-Headers': 'X-Transcription, X-AI-Response', + }, + }); + + return addCorsHeaders(response); + } catch (error: any) { + console.error('Error in voice chat:', error); + + trace.update({ + metadata: { + error: error.message, + error_time: new Date().toISOString(), + }, + }); + + return createErrorResponse(500, `Error processing voice chat: ${error.message}`); + } +} + +/** + * Validates if a string is a valid voice model + * @param voice Voice model to validate + * @returns True if valid, false otherwise + */ +function isValidVoiceModel(voice: string): voice is TTSVoiceModelType { + const validVoices: TTSVoiceModelType[] = [ + 'aura-asteria-en', + 'aura-luna-en', + 'aura-stella-en', + 'aura-athena-en', + 'aura-hera-en', + 'aura-orion-en', + 'aura-arcas-en', + 'aura-perseus-en', + 'aura-angus-en', + 'aura-orpheus-en', + 'aura-helios-en', + 'aura-zeus-en', + ]; + + return validVoices.includes(voice as TTSVoiceModelType); +} diff --git a/screenpipe-js/ai-proxy/src/index.ts b/screenpipe-js/ai-proxy/src/index.ts index eaaec8626b..cebbcbd018 100644 --- a/screenpipe-js/ai-proxy/src/index.ts +++ b/screenpipe-js/ai-proxy/src/index.ts @@ -1,278 +1,16 @@ -import { Langfuse } from 'langfuse-node'; -import { verifyToken } from '@clerk/backend'; -import { createProvider } from './providers'; -import { Env, RequestBody } from './types'; import * as Sentry from '@sentry/cloudflare'; -import { Deepgram, DeepgramClient, LiveClient } from '@deepgram/sdk'; -import { createClient, LiveTranscriptionEvents } from '@deepgram/sdk'; - -// Add cache for subscription status -class SubscriptionCache { - private cache: Map; - private readonly CACHE_TTL = 5 * 60 * 1000; // 5 minutes in milliseconds - - constructor() { - this.cache = new Map(); - } - - get(userId: string): boolean | null { - const entry = this.cache.get(userId); - if (!entry) return null; - - if (Date.now() - entry.timestamp > this.CACHE_TTL) { - this.cache.delete(userId); - return null; - } - - return entry.isValid; - } - - set(userId: string, isValid: boolean) { - this.cache.set(userId, { - isValid, - timestamp: Date.now(), - }); - } -} - -const subscriptionCache = new SubscriptionCache(); - -async function validateSubscription(env: Env, userId: string): Promise { - console.log('validating user id has cloud sub', userId); - // Check cache first - const cached = subscriptionCache.get(userId); - if (cached !== null) { - return cached; - } - - const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; - - if (UUID_REGEX.test(userId)) { - try { - const response = await fetch(`${env.SUPABASE_URL}/rest/v1/rpc/has_active_cloud_subscription`, { - method: 'POST', - headers: { - apikey: env.SUPABASE_ANON_KEY, - Authorization: `Bearer ${env.SUPABASE_ANON_KEY}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ input_user_id: userId }), - }); - - if (!response.ok) { - console.error('Supabase error:', await response.text()); - return false; - } - if (!response.ok) { - console.error('Supabase error:', await response.text()); - return false; - } - - const isValid: boolean = await response.json(); - subscriptionCache.set(userId, isValid); - return isValid; - } catch (error) { - console.error('Error checking subscription:', error); - return false; - } - } - - // If not a UUID, return false to allow Clerk verification to proceed - return false; -} - -async function verifyClerkToken(env: Env, token: string): Promise { - console.log('verifying clerk token', token); - try { - const payload = await verifyToken(token, { - secretKey: env.CLERK_SECRET_KEY, - }); - return payload.sub !== null; - } catch (error) { - console.error('clerk verification failed:', error); - return false; - } -} - -export class RateLimiter { - private state: DurableObjectState; - private requests: Map; - - constructor(state: DurableObjectState) { - this.state = state; - this.requests = new Map(); - } - - async fetch(request: Request) { - const ip = request.headers.get('cf-connecting-ip') || 'unknown'; - const url = new URL(request.url); - const now = Date.now(); - - // different limits for different endpoints - const limits: Record = { - '/v1/chat/completions': { rpm: 20, window: 60000 }, // 20 requests per minute for openai - default: { rpm: 60, window: 60000 }, // 60 rpm for other endpoints - }; - - const limit = limits[url.pathname] || limits.default; - - // get or initialize request tracking - let tracking = this.requests.get(ip) || { count: 0, lastReset: now }; - - // reset if window expired - if (now - tracking.lastReset > limit.window) { - tracking = { count: 0, lastReset: now }; - } - - tracking.count++; - this.requests.set(ip, tracking); - - const isAllowed = tracking.count <= limit.rpm; - - return new Response( - JSON.stringify({ - allowed: isAllowed, - remaining: Math.max(0, limit.rpm - tracking.count), - reset_in: Math.ceil((tracking.lastReset + limit.window - now) / 1000), - }) - ); - } -} - -async function handleChatCompletions(body: RequestBody, env: Env): Promise { - const provider = createProvider(body.model, env); - - if (body.stream) { - const stream = await provider.createStreamingCompletion(body); - return new Response(stream, { - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive', - }, - }); - } - - return await provider.createCompletion(body); -} - -async function handleOptions(request: Request) { - const corsHeaders = { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'GET, HEAD, POST, OPTIONS', - 'Access-Control-Allow-Headers': '*', - 'Access-Control-Max-Age': '86400', - }; - - // Handle CORS preflight requests - if ( - request.headers.get('Origin') !== null && - request.headers.get('Access-Control-Request-Method') !== null && - request.headers.get('Access-Control-Request-Headers') !== null - ) { - return new Response(null, { - headers: { - ...corsHeaders, - 'Access-Control-Allow-Headers': request.headers.get('Access-Control-Request-Headers') || '*', - }, - }); - } - - // Handle standard OPTIONS request - return new Response(null, { - headers: { - Allow: 'GET, HEAD, POST, OPTIONS', - }, - }); -} - -async function handleWebSocketUpgrade(request: Request, env: Env): Promise { - try { - // Generate a unique request ID - const requestId = crypto.randomUUID(); - - // Create WebSocket pair - const webSocketPair = new WebSocketPair(); - const [client, server] = Object.values(webSocketPair); - server.accept(); - - let params = new URL(request.url).searchParams; - let sampleRate = params.get('sample_rate'); - - let url = new URL('wss://api.deepgram.com/v1/listen'); - // for each key in params, set the url search param - for (let [key, value] of params.entries()) { - url.searchParams.set(key, value); - } - - let deepgram: DeepgramClient | null = null; - try { - deepgram = createClient(env.DEEPGRAM_API_KEY); - } catch (error: any) { - console.error('Error creating Deepgram client:', error); - return new Response(`Deepgram client creation failed: ${error.message}`, { status: 500 }); - } - - if (!deepgram) { - return new Response('Deepgram client creation failed', { status: 500 }); - } - - let deepgramSocket: LiveClient; - - try { - deepgramSocket = deepgram.listen.live({}, url.toString()); - } catch (error: any) { - console.error('Error creating Deepgram socket:', error); - return new Response(`Deepgram socket creation failed: ${error.message}`, { status: 500 }); - } - - deepgramSocket.on(LiveTranscriptionEvents.Open, () => { - server.send( - JSON.stringify({ - type: 'connected', - message: 'WebSocket connection established', - }) - ); - }); - - // Simple passthrough: client -> Deepgram - server.addEventListener('message', (event) => { - if (deepgramSocket.getReadyState() === WebSocket.OPEN) { - deepgramSocket.send(event.data); - } - }); - - // Simple passthrough: Deepgram -> client - deepgramSocket.on(LiveTranscriptionEvents.Transcript, (data) => { - if (server.readyState === WebSocket.OPEN) { - server.send(JSON.stringify(data)); - } - }); - - // Handle connection close - server.addEventListener('close', () => { - deepgramSocket.requestClose(); - }); - - // Handle errors - deepgramSocket.on(LiveTranscriptionEvents.Error, (error) => { - if (server.readyState === WebSocket.OPEN) { - server.close(1011, 'Deepgram error: ' + error.message); - } - }); - - return new Response(null, { - status: 101, - webSocket: client, - headers: { - 'dg-request-id': requestId, - }, - }); - } catch (error) { - console.error('WebSocket upgrade failed:', error); - return new Response('WebSocket upgrade failed', { status: 500 }); - } -} +import { Env, RequestBody } from './types'; +import { handleOptions, createSuccessResponse, createErrorResponse } from './utils/cors'; +import { validateAuth } from './utils/auth'; +import { RateLimiter, checkRateLimit } from './utils/rate-limiter'; +import { setupAnalytics } from './services/analytics'; +import { handleChatCompletions } from './handlers/chat'; +import { handleModelListing } from './handlers/models'; +import { handleFileTranscription, handleWebSocketUpgrade } from './handlers/transcription'; +import { handleVoiceTranscription, handleVoiceQuery, handleTextToSpeech, handleVoiceChat } from './handlers/voice'; +// import { handleTTSWebSocketUpgrade } from './handlers/voice-ws'; + +export { RateLimiter }; export default Sentry.withSentry( (env) => ({ @@ -284,310 +22,84 @@ export default Sentry.withSentry( { /** * This is the standard fetch handler for a Cloudflare Worker - * - * @param request - The request submitted to the Worker from the client - * @param env - The interface to reference bindings declared in wrangler.toml - * @param ctx - The execution context of the Worker - * @returns The response to be sent back to the client + * @param request The HTTP request + * @param env Environment variables + * @param ctx Execution context + * @returns HTTP response */ async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { - const langfuse = new Langfuse({ - publicKey: env.LANGFUSE_PUBLIC_KEY, - secretKey: env.LANGFUSE_SECRET_KEY, - baseUrl: 'https://us.cloud.langfuse.com', - }); - - langfuse.debug(); - langfuse.on('error', (error) => { - console.error('langfuse error:', error); - }); - - // Modify your fetch handler to use this for OPTIONS requests - if (request.method === 'OPTIONS') { - return handleOptions(request); - } - - const ip = request.headers.get('cf-connecting-ip') || 'unknown'; - const rateLimiterId = env.RATE_LIMITER.idFromName(ip); - const rateLimiter = env.RATE_LIMITER.get(rateLimiterId); - - // Check rate limit - const rateLimitResponse = await rateLimiter.fetch(request.url); - const rateLimitData = (await rateLimitResponse.json()) as { allowed: boolean; remaining: number; reset_in: number }; - - if (!rateLimitData.allowed) { - const response = new Response( - JSON.stringify({ - error: 'rate limit exceeded', - retry_after: 60, // seconds - }), - { - status: 429, - headers: { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', - 'Access-Control-Allow-Headers': '*', - 'Access-Control-Allow-Credentials': 'true', - 'Access-Control-Max-Age': '86400', - 'Retry-After': '60', - }, - } - ); - response.headers.append('Vary', 'Origin'); - return response; - } + const langfuse = setupAnalytics(env); try { + if (request.method === 'OPTIONS') { + return handleOptions(request); + } + + const rateLimit = await checkRateLimit(request, env); + if (!rateLimit.allowed && rateLimit.response) { + return rateLimit.response; + } + const url = new URL(request.url); const path = url.pathname; + console.log('path', path); + // Handle WebSocket upgrade for real-time transcription const upgradeHeader = request.headers.get('upgrade')?.toLowerCase(); if (path === '/v1/listen' && upgradeHeader === 'websocket') { console.log('websocket request to /v1/listen detected, bypassing auth'); return await handleWebSocketUpgrade(request, env); } - // Add auth check for protected routes if (path !== '/test') { - const authHeader = request.headers.get('Authorization'); - console.log('authHeader', authHeader); - if (!authHeader || !(authHeader.startsWith('Bearer ') || authHeader.startsWith('Token '))) { - const response = new Response(JSON.stringify({ error: 'unauthorized' }), { - status: 401, - headers: { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', - 'Access-Control-Allow-Headers': '*', - 'Access-Control-Allow-Credentials': 'true', - 'Access-Control-Max-Age': '86400', - }, - }); - response.headers.append('Vary', 'Origin'); - return response; - } - - const token = authHeader.split(' ')[1]; - - let isValid = await validateSubscription(env, token); - - // If not valid, try to verify as a Clerk token - if (!isValid) { - isValid = await verifyClerkToken(env, token); - } - - if (!isValid) { - console.log('all validation attempts failed'); - const response = new Response(JSON.stringify({ error: 'invalid subscription' }), { - status: 401, - headers: { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', - 'Access-Control-Allow-Headers': '*', - 'Access-Control-Allow-Credentials': 'true', - 'Access-Control-Max-Age': '86400', - }, - }); - response.headers.append('Vary', 'Origin'); - return response; - } + const authResult = await validateAuth(request, env); + if (!authResult.isValid) { + return createErrorResponse(401, authResult.error || 'unauthorized'); + } } - console.log('path', path); - if (path === '/test') { - const response = new Response('ai proxy is working!', { - status: 200, - headers: { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', - 'Access-Control-Allow-Headers': '*', - 'Access-Control-Allow-Credentials': 'true', - 'Access-Control-Max-Age': '86400', - }, - }); - response.headers.append('Vary', 'Origin'); - return response; + return createSuccessResponse('ai proxy is working!'); } if (path === '/v1/chat/completions' && request.method === 'POST') { const body = (await request.json()) as RequestBody; - - const trace = langfuse.trace({ - id: 'ai_call_' + Date.now(), - name: 'ai_call', - metadata: { - model: body.model, - streaming: body.stream === true, - }, - }); - - try { - const response = await handleChatCompletions(body, env); - trace.update({ - metadata: { - completionStatus: 'success', - completionTime: new Date().toISOString(), - modelUsed: body.model, - isStreaming: body.stream === true, - }, - output: response.statusText, - }); - response.headers.set('Access-Control-Allow-Origin', '*'); - response.headers.set('Access-Control-Allow-Methods', 'GET, HEAD, POST, OPTIONS'); - response.headers.append('Vary', 'Origin'); - return response; - } catch (error: any) { - trace.update({ - metadata: { - completionStatus: 'error', - errorTime: new Date().toISOString(), - errorType: error.name, - errorMessage: error.message, - }, - output: error.message, - }); - throw error; - } + return await handleChatCompletions(body, env, langfuse); } if (path === '/v1/listen' && request.method === 'POST') { - // Get the raw body instead of form data - const audioBuffer = await request.arrayBuffer(); - const languages = request.headers.get('detect_language')?.split(',') || []; - const sampleRate = request.headers.get('sample_rate') || '16000'; - try { - const deepgramResponse = await fetch( - 'https://api.deepgram.com/v1/listen?model=nova-2&smart_format=true&sample_rate=' + - sampleRate + - (languages.length > 0 ? '&' + languages.map((lang) => `detect_language=${lang}`).join('&') : ''), - { - method: 'POST', - headers: { - Authorization: `Token ${env.DEEPGRAM_API_KEY}`, - 'Content-Type': 'audio/wav', // Set correct content type - }, - body: audioBuffer, - } - ); - - if (!deepgramResponse.ok) { - const errorData = await deepgramResponse.json(); - throw new Error(`Deepgram API error: ${JSON.stringify(errorData)}`); - } - - const data = await deepgramResponse.json(); - const response = new Response(JSON.stringify(data), { - headers: { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'GET, HEAD, POST, OPTIONS', - 'Access-Control-Allow-Headers': '*', - 'Content-Type': 'application/json', - }, - }); - response.headers.append('Vary', 'Origin'); - return response; - } catch (error: any) { - console.error('Error in Deepgram request:', error); - const response = new Response( - JSON.stringify({ - error: error.message, - details: error.stack, - }), - { - status: 500, - headers: { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'GET, HEAD, POST, OPTIONS', - 'Access-Control-Allow-Headers': '*', - 'Content-Type': 'application/json', - }, - } - ); - response.headers.append('Vary', 'Origin'); - return response; - } + return await handleFileTranscription(request, env); } if (path === '/v1/models' && request.method === 'GET') { - try { - // Create instances of all providers - const providers = { - anthropic: createProvider('claude-3-5-sonnet-latest', env), - openai: createProvider('gpt-4', env), - gemini: createProvider('gemini-1.5-pro', env), - }; - - // Fetch models from all providers in parallel - const results = await Promise.allSettled([ - providers.anthropic.listModels(), - providers.openai.listModels(), - providers.gemini.listModels(), - ]); - - // Combine and filter out failed requests - const models = results - .filter( - (result): result is PromiseFulfilledResult<{ id: string; name: string; provider: string }[]> => - result.status === 'fulfilled' - ) - .flatMap((result) => result.value); - - const response = new Response(JSON.stringify({ models }), { - headers: { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'GET, HEAD, POST, OPTIONS', - 'Access-Control-Allow-Headers': '*', - 'Content-Type': 'application/json', - }, - }); - response.headers.append('Vary', 'Origin'); - return response; - } catch (error) { - console.error('Error fetching models:', error); - const response = new Response( - JSON.stringify({ - error: 'Failed to fetch models', - details: error instanceof Error ? error.message : 'Unknown error', - }), - { - status: 500, - headers: { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'GET, HEAD, POST, OPTIONS', - 'Access-Control-Allow-Headers': '*', - 'Content-Type': 'application/json', - }, - } - ); - response.headers.append('Vary', 'Origin'); - return response; - } + return await handleModelListing(env); + } + + if (path === '/v1/voice/transcribe' && request.method === 'POST') { + return await handleVoiceTranscription(request, env); + } + + if (path === '/v1/voice/query' && request.method === 'POST') { + return await handleVoiceQuery(request, env, langfuse); + } + + if (path === '/v1/text-to-speech' && request.method === 'POST') { + return await handleTextToSpeech(request, env, langfuse); + } + + if (path === '/v1/voice/chat' && request.method === 'POST') { + return await handleVoiceChat(request, env, langfuse); } - const response = new Response('not found', { - status: 404, - headers: { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', - 'Access-Control-Allow-Headers': '*', - 'Access-Control-Max-Age': '86400', - }, - }); - response.headers.append('Vary', 'Origin'); - return response; + // //TODO: + // if (path === '/v1/tts-ws' && upgradeHeader === 'websocket') { + // return await handleTTSWebSocketUpgrade(request, env); + // } + + return createErrorResponse(404, 'not found'); } catch (error) { console.error('error in fetch:', error); - const response = new Response('an error occurred', { - status: 500, - headers: { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', - 'Access-Control-Allow-Headers': '*', - 'Access-Control-Max-Age': '86400', - }, - }); - response.headers.append('Vary', 'Origin'); - return response; + return createErrorResponse(500, 'an error occurred'); } finally { await langfuse.shutdownAsync(); } diff --git a/screenpipe-js/ai-proxy/src/providers/openai.ts b/screenpipe-js/ai-proxy/src/providers/openai.ts index f90c9c87af..bd23e586f1 100644 --- a/screenpipe-js/ai-proxy/src/providers/openai.ts +++ b/screenpipe-js/ai-proxy/src/providers/openai.ts @@ -68,43 +68,57 @@ export class OpenAIProvider implements AIProvider { messages: this.formatMessages(body.messages), temperature: body.temperature, stream: true, - response_format: - body.response_format?.type === 'json_object' - ? { type: 'json_object' } - : body.response_format?.type === 'json_schema' - ? { - type: 'json_schema', - json_schema: { - schema: body.response_format.schema!, - name: body.response_format.name || 'default', - strict: true, - }, - } - : undefined, + response_format: this.formatResponseFormat(body.response_format), tools: body.tools as ChatCompletionCreateParams['tools'], }); - + return new ReadableStream({ async start(controller) { try { for await (const chunk of stream) { - const content = chunk.choices[0]?.delta?.content; - if (content) { - controller.enqueue( - new TextEncoder().encode( - `data: ${JSON.stringify({ - choices: [{ delta: { content } }], - })}\n\n` - ) - ); + if (body.response_format?.type === 'json_object' || body.response_format?.type === 'json_schema') { + const content = chunk.choices[0]?.delta?.content; + if (content) { + controller.enqueue( + new TextEncoder().encode( + `data: ${JSON.stringify({ + choices: [{ delta: { content } }], + })}\n\n` + ) + ); + } + } else { + const content = chunk.choices[0]?.delta?.content; + if (content) { + controller.enqueue( + new TextEncoder().encode( + `data: ${JSON.stringify({ + choices: [{ delta: { content } }], + })}\n\n` + ) + ); + } } } + controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')); controller.close(); } catch (error) { - controller.error(error); + console.error('Streaming error:', error); + if (error instanceof Error) { + if (error.name === 'APIConnectionTimeoutError' || error.name === 'APIConnectionError') { + controller.error(new Error('Connection error or timeout')); + } else { + controller.error(error); + } + } else { + controller.error(new Error('Unknown error during streaming')); + } } }, + cancel() { + stream.controller.abort(); + } }); } diff --git a/screenpipe-js/ai-proxy/src/services/analytics.ts b/screenpipe-js/ai-proxy/src/services/analytics.ts new file mode 100644 index 0000000000..0ec4b67ecf --- /dev/null +++ b/screenpipe-js/ai-proxy/src/services/analytics.ts @@ -0,0 +1,22 @@ +import { Langfuse } from 'langfuse-node'; +import { Env } from '../types'; + +/** + * Sets up and configures the analytics service + * @param env Environment variables + * @returns Configured Langfuse instance + */ +export function setupAnalytics(env: Env): Langfuse { + const langfuse = new Langfuse({ + publicKey: env.LANGFUSE_PUBLIC_KEY, + secretKey: env.LANGFUSE_SECRET_KEY, + baseUrl: 'https://us.cloud.langfuse.com', + }); + + langfuse.debug(); + langfuse.on('error', (error) => { + console.error('langfuse error:', error); + }); + + return langfuse; +} \ No newline at end of file diff --git a/screenpipe-js/ai-proxy/src/services/deepgram.ts b/screenpipe-js/ai-proxy/src/services/deepgram.ts new file mode 100644 index 0000000000..d9a5990ddf --- /dev/null +++ b/screenpipe-js/ai-proxy/src/services/deepgram.ts @@ -0,0 +1,11 @@ +import { createClient, DeepgramClient } from '@deepgram/sdk'; +import { Env } from '../types'; + +let deepgramClientInstance: DeepgramClient | null = null; + +export function getDeepgramClient(env: Env): DeepgramClient { + if (!deepgramClientInstance) { + deepgramClientInstance = createClient(env.DEEPGRAM_API_KEY); + } + return deepgramClientInstance; +} \ No newline at end of file diff --git a/screenpipe-js/ai-proxy/src/types.ts b/screenpipe-js/ai-proxy/src/types.ts index e3e38aed91..2addcfeb06 100644 --- a/screenpipe-js/ai-proxy/src/types.ts +++ b/screenpipe-js/ai-proxy/src/types.ts @@ -125,3 +125,157 @@ export interface Env { SUPABASE_ANON_KEY: string; NODE_ENV: string; } + +export interface ResponseUtils { + createSuccessResponse: (body: string | object, status?: number) => Response; + createErrorResponse: (status: number, message: string) => Response; +} + +// Supported audio file formats +export type AudioFormat = 'wav' | 'mp3' | 'flac' | 'ogg' | 'webm'; + +// Supported content types for audio +export type AudioContentType = + | 'audio/wav' + | 'audio/mpeg' + | 'audio/flac' + | 'audio/ogg' + | 'audio/webm'; + +// supported deepgram transcription models +export type TranscriptionModelType = + | 'nova-2' + | 'nova-3' + | 'enhanced' + | 'whisper'; + +// supported deepgram TTS voice models +export type TTSVoiceModelType = + | 'aura-asteria-en' + | 'aura-luna-en' + | 'aura-stella-en' + | 'aura-athena-en' + | 'aura-hera-en' + | 'aura-orion-en' + | 'aura-arcas-en' + | 'aura-perseus-en' + | 'aura-angus-en' + | 'aura-orpheus-en' + | 'aura-helios-en' + | 'aura-zeus-en'; + +export type AudioEncodingType = + | 'linear16' // WAV format + | 'mp3'; // MP3 format + +export interface TranscriptionOptions { + model?: TranscriptionModelType; + language?: string; + detectLanguage?: boolean; + languages?: string[]; + smartFormat?: boolean; + diarize?: boolean; + punctuate?: boolean; + sampleRate?: string; +} + +export interface TextToSpeechRequest { + text: string; + voice?: TTSVoiceModelType; +} + +export interface TTSOptions { + voice?: TTSVoiceModelType; + encoding?: AudioEncodingType; +} + +export interface TTSWebSocketOptions { + model: TTSVoiceModelType; + encoding: AudioEncodingType; + sampleRate: number; +} + +export interface TranscriptionResult { + text: string; + confidence: number; + language?: string; + words?: Array<{ + word: string; + start: number; + end: number; + confidence: number; + punctuated_word?: string; + }>; + error?: string; +} + +export interface VoiceQueryResult { + transcription: string; + transcription_details?: { + confidence: number; + language?: string; + words?: any[]; + }; + ai_response: any; +} + +export interface TTSBaseMessage { + type: string; +} + +export interface TTSSpeakMessage extends TTSBaseMessage { + type: 'Speak'; + text: string; +} + +export interface TTSFlushMessage extends TTSBaseMessage { + type: 'Flush'; +} + +export interface TTSClearMessage extends TTSBaseMessage { + type: 'Clear'; +} + +export interface TTSCloseMessage extends TTSBaseMessage { + type: 'Close'; +} + +export interface TTSFlushedResponse { + type: 'Flushed'; + sequence_id: number; +} + +export interface TTSClearedResponse { + type: 'Cleared'; + sequence_id: number; +} + +export interface TTSMetadataResponse { + type: 'Metadata'; + request_id: string; +} + +export interface TTSErrorResponse { + type: 'Error'; + err_code: string; + err_msg: string; +} + +export interface TTSWarningResponse { + type: 'Warning'; + warn_code: string; + warn_msg: string; +} + +export type TTSWebSocketMessage = + | TTSSpeakMessage + | TTSFlushMessage + | TTSClearMessage + | TTSCloseMessage; + +export type TTSWebSocketResponse = + | TTSFlushedResponse + | TTSClearedResponse + | TTSMetadataResponse + | TTSErrorResponse + | TTSWarningResponse; \ No newline at end of file diff --git a/screenpipe-js/ai-proxy/src/utils/auth.ts b/screenpipe-js/ai-proxy/src/utils/auth.ts new file mode 100644 index 0000000000..4acf218803 --- /dev/null +++ b/screenpipe-js/ai-proxy/src/utils/auth.ts @@ -0,0 +1,51 @@ +import { verifyToken } from '@clerk/backend'; +import { Env } from '../types'; +import { validateSubscription } from './subscription'; + +/** + * Verifies a JWT token from Clerk + * @param env Environment variables + * @param token JWT token to verify + * @returns Promise resolving to boolean indicating if token is valid + */ +export async function verifyClerkToken(env: Env, token: string): Promise { + console.log('verifying clerk token', token); + try { + const payload = await verifyToken(token, { + secretKey: env.CLERK_SECRET_KEY, + }); + return payload.sub !== null; + } catch (error) { + console.error('clerk verification failed:', error); + return false; + } +} + +/** + * Validates user authentication from request headers + * @param request HTTP request + * @param env Environment variables + * @returns Object with validation result and optional error message + */ +export async function validateAuth(request: Request, env: Env): Promise<{ isValid: boolean; error?: string }> { + const authHeader = request.headers.get('Authorization'); + + if (!authHeader || !(authHeader.startsWith('Bearer ') || authHeader.startsWith('Token '))) { + return { isValid: false, error: 'unauthorized' }; + } + + const token = authHeader.split(' ')[1]; + let isValid = await validateSubscription(env, token); + + // If not valid, try to verify as a clerk token + if (!isValid) { + isValid = await verifyClerkToken(env, token); + } + + if (!isValid) { + console.log('all validation attempts failed'); + return { isValid: false, error: 'invalid subscription' }; + } + + return { isValid: true }; +} \ No newline at end of file diff --git a/screenpipe-js/ai-proxy/src/utils/cors.ts b/screenpipe-js/ai-proxy/src/utils/cors.ts new file mode 100644 index 0000000000..20e540d109 --- /dev/null +++ b/screenpipe-js/ai-proxy/src/utils/cors.ts @@ -0,0 +1,97 @@ +/** + * Adds CORS headers to a response + * @param response The response to add headers to + * @returns The response with CORS headers added + */ +export function addCorsHeaders(response: Response): Response { + response.headers.set('Access-Control-Allow-Origin', '*'); + response.headers.set('Access-Control-Allow-Methods', 'GET, HEAD, POST, OPTIONS'); + response.headers.set('Access-Control-Allow-Headers', '*'); + response.headers.set('Access-Control-Allow-Credentials', 'true'); + response.headers.set('Access-Control-Max-Age', '86400'); + response.headers.append('Vary', 'Origin'); + return response; +} + +/** + * Handles OPTIONS requests for CORS preflight + * @param request The request object + * @returns A response for the OPTIONS request + */ +export function handleOptions(request: Request): Response { + const corsHeaders = { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, HEAD, POST, OPTIONS', + 'Access-Control-Allow-Headers': '*', + 'Access-Control-Max-Age': '86400', + }; + + if ( + request.headers.get('Origin') !== null && + request.headers.get('Access-Control-Request-Method') !== null && + request.headers.get('Access-Control-Request-Headers') !== null + ) { + return new Response(null, { + headers: { + ...corsHeaders, + 'Access-Control-Allow-Headers': request.headers.get('Access-Control-Request-Headers') || '*', + }, + }); + } + + return new Response(null, { + headers: { + Allow: 'GET, HEAD, POST, OPTIONS', + }, + }); +} + +/** + * Creates a standardized success response with CORS headers + * @param body The response body (string or object) + * @param status The HTTP status code (default: 200) + * @returns A Response object with CORS headers + */ +export function createSuccessResponse(body: string | object, status = 200): Response { + const responseBody = typeof body === 'string' ? body : JSON.stringify(body); + const contentType = typeof body === 'string' ? 'text/plain' : 'application/json'; + + const response = new Response(responseBody, { + status, + headers: { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', + 'Access-Control-Allow-Headers': '*', + 'Access-Control-Allow-Credentials': 'true', + 'Access-Control-Max-Age': '86400', + 'Content-Type': contentType, + }, + }); + response.headers.append('Vary', 'Origin'); + return response; +} + +/** + * Creates a standardized error response with CORS headers + * @param status The HTTP status code + * @param message The error message + * @returns A Response object with CORS headers + */ +export function createErrorResponse(status: number, message: string): Response { + const response = new Response( + JSON.stringify({ error: message }), + { + status, + headers: { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', + 'Access-Control-Allow-Headers': '*', + 'Access-Control-Allow-Credentials': 'true', + 'Access-Control-Max-Age': '86400', + 'Content-Type': 'application/json', + }, + } + ); + response.headers.append('Vary', 'Origin'); + return response; +} \ No newline at end of file diff --git a/screenpipe-js/ai-proxy/src/utils/rate-limiter.ts b/screenpipe-js/ai-proxy/src/utils/rate-limiter.ts new file mode 100644 index 0000000000..ea759a00a1 --- /dev/null +++ b/screenpipe-js/ai-proxy/src/utils/rate-limiter.ts @@ -0,0 +1,78 @@ +import { createErrorResponse } from './cors'; +import { Env } from '../types'; + +export class RateLimiter { + private state: DurableObjectState; + private requests: Map; + + constructor(state: DurableObjectState) { + this.state = state; + this.requests = new Map(); + } + + /** + * Handles fetch requests to check and update rate limits + * @param request The HTTP request + * @returns Response with rate limit status + */ + async fetch(request: Request) { + const ip = request.headers.get('cf-connecting-ip') || 'unknown'; + const url = new URL(request.url); + const now = Date.now(); + + // idk much about limits loius wanted to have + const limits: Record = { + '/v1/chat/completions': { rpm: 20, window: 60000 }, // 20 requests per minute for openai + '/v1/voice/transcribe': { rpm: 15, window: 60000 }, // 15 requests per minute for voice transcription + '/v1/voice/query': { rpm: 10, window: 60000 }, // 10 requests per minute for voice queries + '/v1/text-to-speech': { rpm: 15, window: 60000 }, // 15 requests per minute for text-to-speech + '/v1/voice/chat': { rpm: 8, window: 60000 }, // 8 requests per minute for voice chat + default: { rpm: 60, window: 60000 }, // 60 rpm for other endpoints + }; + + const limit = limits[url.pathname] || limits.default; + + let tracking = this.requests.get(ip) || { count: 0, lastReset: now }; + + if (now - tracking.lastReset > limit.window) { + tracking = { count: 0, lastReset: now }; + } + + tracking.count++; + this.requests.set(ip, tracking); + + const isAllowed = tracking.count <= limit.rpm; + + return new Response( + JSON.stringify({ + allowed: isAllowed, + remaining: Math.max(0, limit.rpm - tracking.count), + reset_in: Math.ceil((tracking.lastReset + limit.window - now) / 1000), + }) + ); + } +} + +/** + * Checks if the request exceeds rate limits + * @param request The HTTP request + * @param env Environment variables + * @returns Object indicating if request is allowed and optional error response + */ +export async function checkRateLimit(request: Request, env: Env): Promise<{ allowed: boolean; response?: Response }> { + const ip = request.headers.get('cf-connecting-ip') || 'unknown'; + const rateLimiterId = env.RATE_LIMITER.idFromName(ip); + const rateLimiter = env.RATE_LIMITER.get(rateLimiterId); + + const rateLimitResponse = await rateLimiter.fetch(request.url); + const rateLimitData = (await rateLimitResponse.json()) as { allowed: boolean; remaining: number; reset_in: number }; + + if (!rateLimitData.allowed) { + return { + allowed: false, + response: createErrorResponse(429, 'rate limit exceeded') + }; + } + + return { allowed: true }; +} \ No newline at end of file diff --git a/screenpipe-js/ai-proxy/src/utils/subscription.ts b/screenpipe-js/ai-proxy/src/utils/subscription.ts new file mode 100644 index 0000000000..b5227abce4 --- /dev/null +++ b/screenpipe-js/ai-proxy/src/utils/subscription.ts @@ -0,0 +1,79 @@ +import { Env } from '../types'; + +/** + * Cache for subscription status to reduce API calls + */ +class SubscriptionCache { + private cache: Map; + private readonly CACHE_TTL = 5 * 60 * 1000; // 5 minutes in milliseconds + + constructor() { + this.cache = new Map(); + } + + get(userId: string): boolean | null { + const entry = this.cache.get(userId); + if (!entry) return null; + + if (Date.now() - entry.timestamp > this.CACHE_TTL) { + this.cache.delete(userId); + return null; + } + + return entry.isValid; + } + + set(userId: string, isValid: boolean) { + this.cache.set(userId, { + isValid, + timestamp: Date.now(), + }); + } +} + +export const subscriptionCache = new SubscriptionCache(); + +/** + * Validates if a user has an active subscription + * @param env Environment variables + * @param userId User ID to validate + * @returns Promise resolving to boolean indicating if subscription is valid + */ +export async function validateSubscription(env: Env, userId: string): Promise { + console.log('validating user id has cloud sub', userId); + // Check cache first + const cached = subscriptionCache.get(userId); + if (cached !== null) { + return cached; + } + + const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + + if (UUID_REGEX.test(userId)) { + try { + const response = await fetch(`${env.SUPABASE_URL}/rest/v1/rpc/has_active_cloud_subscription`, { + method: 'POST', + headers: { + apikey: env.SUPABASE_ANON_KEY, + Authorization: `Bearer ${env.SUPABASE_ANON_KEY}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ input_user_id: userId }), + }); + + if (!response.ok) { + console.error('Supabase error:', await response.text()); + return false; + } + + const isValid: boolean = await response.json(); + subscriptionCache.set(userId, isValid); + return isValid; + } catch (error) { + console.error('Error checking subscription:', error); + return false; + } + } + + return false; +} \ No newline at end of file diff --git a/screenpipe-js/ai-proxy/src/utils/voice-utils.ts b/screenpipe-js/ai-proxy/src/utils/voice-utils.ts new file mode 100644 index 0000000000..d5837a9f3b --- /dev/null +++ b/screenpipe-js/ai-proxy/src/utils/voice-utils.ts @@ -0,0 +1,219 @@ +import { Env, TranscriptionOptions, TranscriptionResult, TTSOptions, AudioFormat, AudioContentType } from '../types'; +import { createClient } from '@deepgram/sdk'; + +/** + * Determines the content type based on file format + * @param format Audio format + * @returns Content type string + */ +export function getContentType(format: AudioFormat): AudioContentType { + switch (format) { + case 'wav': + return 'audio/wav'; + case 'mp3': + return 'audio/mpeg'; + case 'flac': + return 'audio/flac'; + case 'ogg': + return 'audio/ogg'; + case 'webm': + return 'audio/webm'; + default: + return 'audio/wav'; + } +} + +/** + * Validates that the request contains valid audio data + * @param request Request containing audio data + * @returns Validation result with audio buffer if valid + */ +export async function validateAudioInput(request: Request): Promise<{ + valid: boolean; + audioBuffer?: ArrayBuffer; + contentType?: AudioContentType; + error?: string; +}> { + try { + const contentType = request.headers.get('Content-Type'); + + if (!contentType || !contentType.includes('audio/')) { + return { + valid: false, + error: 'Invalid content type. Expected audio file.', + }; + } + + let validContentType: AudioContentType | undefined; + if (contentType.includes('audio/wav') || contentType.includes('audio/x-wav')) { + validContentType = 'audio/wav'; + } else if (contentType.includes('audio/mpeg')) { + validContentType = 'audio/mpeg'; + } else if (contentType.includes('audio/flac')) { + validContentType = 'audio/flac'; + } else if (contentType.includes('audio/ogg')) { + validContentType = 'audio/ogg'; + } else if (contentType.includes('audio/webm')) { + validContentType = 'audio/webm'; + } else { + return { + valid: false, + error: `Unsupported audio format: ${contentType}. Supported formats: wav, mp3, flac, ogg, webm.`, + }; + } + + const audioBuffer = await request.arrayBuffer(); + + if (!audioBuffer || audioBuffer.byteLength === 0) { + return { + valid: false, + error: 'Empty audio file received.', + }; + } + + // check file size (limit to 10MB) + if (audioBuffer.byteLength > 10 * 1024 * 1024) { + return { + valid: false, + error: 'Audio file too large. Maximum size is 100MB.', + }; + } + + return { + valid: true, + audioBuffer, + contentType: validContentType, + }; + } catch (error: any) { + return { + valid: false, + error: `Error processing audio: ${error.message}`, + }; + } +} + +/** + * Transcribes audio data to text using Deepgram's API + * @param audioBuffer Audio data to transcribe + * @param env Environment variables containing API keys + * @param options Transcription options + * @returns Transcription result with text and metadata + */ +export async function transcribeAudio( + audioBuffer: ArrayBuffer, + env: Env, + options: TranscriptionOptions = {} +): Promise { + try { + // Set up default options + const defaultOptions: Required = { + model: 'nova-3', + language: 'en', + detectLanguage: false, + languages: [], + smartFormat: true, + diarize: false, + punctuate: true, + sampleRate: '16000', + }; + + const mergedOptions = { ...defaultOptions, ...options }; + + if (mergedOptions.languages && mergedOptions.languages.length > 0) { + mergedOptions.detectLanguage = mergedOptions.languages.length > 1; + mergedOptions.language = mergedOptions.languages[0]; + } + + const deepgramClient = createClient(env.DEEPGRAM_API_KEY); + + const buffer = Buffer.from(audioBuffer); + + const transcriptionOptions = { + model: mergedOptions.model, + smart_format: mergedOptions.smartFormat, + diarize: mergedOptions.diarize, + language: mergedOptions.language, + detect_language: mergedOptions.detectLanguage, + punctuate: mergedOptions.punctuate, + sample_rate: mergedOptions.sampleRate, + }; + + console.log(`Transcribing audio with model ${mergedOptions.model}, language: ${mergedOptions.language}`); + + const { result, error } = await deepgramClient.listen.prerecorded.transcribeFile(buffer, transcriptionOptions); + + if (error) { + throw new Error(`Deepgram transcription error: ${error.message}`); + } + + const transcription = result?.results?.channels[0]?.alternatives[0]?.transcript || ''; + const confidence = result?.results?.channels[0]?.alternatives[0]?.confidence || 0; + const words = result?.results?.channels[0]?.alternatives[0]?.words || []; + const detectedLanguage = result?.results?.channels[0]?.detected_language; + + return { + text: transcription, + confidence, + language: detectedLanguage, + words, + }; + } catch (error: any) { + console.error('Transcription error:', error); + return { + text: '', + confidence: 0, + error: error.message, + }; + } +} + +/** + * Converts text to speech using Deepgram's REST API + * @param text Text to convert to speech + * @param env Environment variables containing API keys + * @param options Text-to-speech options + * @returns Audio buffer of the synthesized speech or null if error + */ +export async function textToSpeech(text: string, env: Env, options: TTSOptions = {}): Promise { + try { + if (!text || text.trim() === '') { + throw new Error('Empty text provided for text-to-speech conversion'); + } + + const voice = options.voice || 'aura-asteria-en'; + const encoding = options.encoding || 'linear16'; + + console.log(`Converting text to speech using voice: ${voice}, encoding: ${encoding}`); + console.log(`Text length: ${text.length} characters`); + + const url = new URL('https://api.deepgram.com/v1/speak'); + url.searchParams.set('model', voice); + + if (encoding !== 'linear16') { + url.searchParams.set('encoding', encoding); + } + + const response = await fetch(url.toString(), { + method: 'POST', + headers: { + Authorization: `Token ${env.DEEPGRAM_API_KEY}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ text }), + }); + + if (!response.ok) { + const errorText = await response.text(); + console.error(`TTS API error ${response.status}: ${errorText}`); + throw new Error(`Deepgram TTS error: ${errorText}`); + } + + const audioBuffer = await response.arrayBuffer(); + console.log(`Received audio response: ${audioBuffer.byteLength} bytes`); + + return audioBuffer; + } catch (error: any) { + console.error('Error in text-to-speech:', error); + return null; + } +}