From dd7c4675d32eaa516907b0798694029c115a17b7 Mon Sep 17 00:00:00 2001 From: Timothy Carambat Date: Mon, 16 Dec 2024 14:31:17 -0800 Subject: [PATCH] LLM performance metric tracking (#2825) * WIP performance metric tracking * fix: patch UI trying to .toFixed() null metric Anthropic tracking migraiton cleanup logs * Apipie implmentation, not tested * Cleanup Anthropic notes, Add support for AzureOpenAI tracking * bedrock token metric tracking * Cohere support * feat: improve default stream handler to track for provider who are actually OpenAI compliant in usage reporting add deepseek support * feat: Add FireworksAI tracking reporting fix: improve handler when usage:null is reported (why?) * Add token reporting for GenericOpenAI * token reporting for koboldcpp + lmstudio * lint * support Groq token tracking * HF token tracking * token tracking for togetherai * LiteLLM token tracking * linting + Mitral token tracking support * XAI token metric reporting * native provider runner * LocalAI token tracking * Novita token tracking * OpenRouter token tracking * Apipie stream metrics * textwebgenui token tracking * perplexity token reporting * ollama token reporting * lint * put back comment * Rip out LC ollama wrapper and use official library * patch images with new ollama lib * improve ollama offline message * fix image handling in ollama llm provider * lint * NVIDIA NIM token tracking * update openai compatbility responses * UI/UX show/hide metrics on click for user preference * update bedrock client --------- Co-authored-by: shatfield4 --- .../Actions/RenderMetrics/index.jsx | 118 +++++++++ .../HistoricalMessage/Actions/index.jsx | 3 + .../ChatHistory/HistoricalMessage/index.jsx | 2 + .../ChatContainer/ChatHistory/index.jsx | 1 + .../ChatContainer/ChatTooltips/index.jsx | 6 + .../WorkspaceChat/ChatContainer/index.jsx | 19 +- frontend/src/utils/chat/index.js | 49 +++- server/.env.example | 4 + server/endpoints/api/admin/index.js | 48 ++-- server/utils/AiProviders/anthropic/index.js | 79 ++++-- server/utils/AiProviders/apipie/index.js | 73 ++++-- server/utils/AiProviders/azureOpenAi/index.js | 62 ++++- server/utils/AiProviders/bedrock/index.js | 75 ++++-- server/utils/AiProviders/cohere/index.js | 80 +++++-- server/utils/AiProviders/deepseek/index.js | 57 +++-- server/utils/AiProviders/fireworksAi/index.js | 48 +++- server/utils/AiProviders/gemini/index.js | 57 ++++- .../utils/AiProviders/genericOpenAi/index.js | 64 +++-- server/utils/AiProviders/groq/index.js | 59 +++-- server/utils/AiProviders/huggingface/index.js | 54 +++-- server/utils/AiProviders/koboldCPP/index.js | 80 +++++-- server/utils/AiProviders/liteLLM/index.js | 65 +++-- server/utils/AiProviders/lmStudio/index.js | 49 ++-- server/utils/AiProviders/localAi/index.js | 54 +++-- server/utils/AiProviders/mistral/index.js | 52 +++- server/utils/AiProviders/native/index.js | 49 +++- server/utils/AiProviders/novita/index.js | 80 +++++-- server/utils/AiProviders/nvidiaNim/index.js | 53 +++-- server/utils/AiProviders/ollama/index.js | 225 +++++++++++------- server/utils/AiProviders/openAi/index.js | 60 +++-- server/utils/AiProviders/openRouter/index.js | 89 +++++-- server/utils/AiProviders/perplexity/index.js | 57 +++-- .../utils/AiProviders/textGenWebUI/index.js | 57 +++-- server/utils/AiProviders/togetherAi/index.js | 52 +++- server/utils/AiProviders/xai/index.js | 57 +++-- server/utils/chats/apiChatHandler.js | 42 +++- server/utils/chats/embed.js | 13 +- server/utils/chats/openaiCompatible.js | 44 +++- server/utils/chats/stream.js | 23 +- .../helpers/chat/LLMPerformanceMonitor.js | 101 ++++++++ server/utils/helpers/chat/responses.js | 48 +++- server/utils/helpers/index.js | 28 ++- 42 files changed, 1770 insertions(+), 566 deletions(-) create mode 100644 frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/HistoricalMessage/Actions/RenderMetrics/index.jsx create mode 100644 server/utils/helpers/chat/LLMPerformanceMonitor.js diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/HistoricalMessage/Actions/RenderMetrics/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/HistoricalMessage/Actions/RenderMetrics/index.jsx new file mode 100644 index 0000000000..d5f99c18ff --- /dev/null +++ b/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/HistoricalMessage/Actions/RenderMetrics/index.jsx @@ -0,0 +1,118 @@ +import { numberWithCommas } from "@/utils/numbers"; +import React, { useEffect, useState, useContext } from "react"; +const MetricsContext = React.createContext(); +const SHOW_METRICS_KEY = "anythingllm_show_chat_metrics"; +const SHOW_METRICS_EVENT = "anythingllm_show_metrics_change"; + +/** + * @param {number} duration - duration in milliseconds + * @returns {string} + */ +function formatDuration(duration) { + try { + return duration < 1 + ? `${(duration * 1000).toFixed(0)}ms` + : `${duration.toFixed(3)}s`; + } catch { + return ""; + } +} + +/** + * Format the output TPS to a string + * @param {number} outputTps - output TPS + * @returns {string} + */ +function formatTps(outputTps) { + try { + return outputTps < 1000 + ? outputTps.toFixed(2) + : numberWithCommas(outputTps.toFixed(0)); + } catch { + return ""; + } +} + +/** + * Get the show metrics setting from localStorage `anythingllm_show_chat_metrics` key + * @returns {boolean} + */ +function getAutoShowMetrics() { + return window?.localStorage?.getItem(SHOW_METRICS_KEY) === "true"; +} + +/** + * Toggle the show metrics setting in localStorage `anythingllm_show_chat_metrics` key + * @returns {void} + */ +function toggleAutoShowMetrics() { + const currentValue = getAutoShowMetrics() || false; + window?.localStorage?.setItem(SHOW_METRICS_KEY, !currentValue); + window.dispatchEvent( + new CustomEvent(SHOW_METRICS_EVENT, { + detail: { showMetricsAutomatically: !currentValue }, + }) + ); + return !currentValue; +} + +/** + * Provider for the metrics context that controls the visibility of the metrics + * per-chat based on the user's preference. + * @param {React.ReactNode} children + * @returns {React.ReactNode} + */ +export function MetricsProvider({ children }) { + const [showMetricsAutomatically, setShowMetricsAutomatically] = + useState(getAutoShowMetrics()); + + useEffect(() => { + function handleShowingMetricsEvent(e) { + if (!e?.detail?.hasOwnProperty("showMetricsAutomatically")) return; + setShowMetricsAutomatically(e.detail.showMetricsAutomatically); + } + console.log("Adding event listener for metrics visibility"); + window.addEventListener(SHOW_METRICS_EVENT, handleShowingMetricsEvent); + return () => + window.removeEventListener(SHOW_METRICS_EVENT, handleShowingMetricsEvent); + }, []); + + return ( + + {children} + + ); +} + +/** + * Render the metrics for a given chat, if available + * @param {metrics: {duration:number, outputTps: number}} props + * @returns + */ +export default function RenderMetrics({ metrics = {} }) { + // Inherit the showMetricsAutomatically state from the MetricsProvider so the state is shared across all chats + const { showMetricsAutomatically, setShowMetricsAutomatically } = + useContext(MetricsContext); + if (!metrics?.duration || !metrics?.outputTps) return null; + + return ( + + ); +} diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/HistoricalMessage/Actions/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/HistoricalMessage/Actions/index.jsx index 5dc15e1a16..0b05576008 100644 --- a/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/HistoricalMessage/Actions/index.jsx +++ b/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/HistoricalMessage/Actions/index.jsx @@ -3,6 +3,7 @@ import useCopyText from "@/hooks/useCopyText"; import { Check, ThumbsUp, ArrowsClockwise, Copy } from "@phosphor-icons/react"; import Workspace from "@/models/workspace"; import { EditMessageAction } from "./EditMessage"; +import RenderMetrics from "./RenderMetrics"; import ActionMenu from "./ActionMenu"; const Actions = ({ @@ -15,6 +16,7 @@ const Actions = ({ forkThread, isEditing, role, + metrics = {}, }) => { const [selectedFeedback, setSelectedFeedback] = useState(feedbackScore); const handleFeedback = async (newFeedback) => { @@ -58,6 +60,7 @@ const Actions = ({ /> + ); }; diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/HistoricalMessage/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/HistoricalMessage/index.jsx index c311dd6eda..fb6166f64c 100644 --- a/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/HistoricalMessage/index.jsx +++ b/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/HistoricalMessage/index.jsx @@ -26,6 +26,7 @@ const HistoricalMessage = ({ regenerateMessage, saveEditedMessage, forkThread, + metrics = {}, }) => { const { isEditing } = useEditMessage({ chatId, role }); const { isDeleted, completeDelete, onEndAnimation } = useWatchDeleteMessage({ @@ -117,6 +118,7 @@ const HistoricalMessage = ({ isEditing={isEditing} role={role} forkThread={forkThread} + metrics={metrics} /> {role === "assistant" && } diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/index.jsx index 9354e0312a..9bf8eeebbe 100644 --- a/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/index.jsx +++ b/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/index.jsx @@ -227,6 +227,7 @@ export default function ChatHistory({ isLastMessage={isLastBotReply} saveEditedMessage={saveEditedMessage} forkThread={forkThread} + metrics={props.metrics} /> ); })} diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/ChatTooltips/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/ChatTooltips/index.jsx index a7bb2ef873..3aa5dc6ccb 100644 --- a/frontend/src/components/WorkspaceChat/ChatContainer/ChatTooltips/index.jsx +++ b/frontend/src/components/WorkspaceChat/ChatContainer/ChatTooltips/index.jsx @@ -61,6 +61,12 @@ export function ChatTooltips() { // as the citation modal is z-indexed above the chat history className="tooltip !text-xs z-[100]" /> + ); } diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/index.jsx index b0ffcbfc88..9fc640c4be 100644 --- a/frontend/src/components/WorkspaceChat/ChatContainer/index.jsx +++ b/frontend/src/components/WorkspaceChat/ChatContainer/index.jsx @@ -18,6 +18,7 @@ import SpeechRecognition, { useSpeechRecognition, } from "react-speech-recognition"; import { ChatTooltips } from "./ChatTooltips"; +import { MetricsProvider } from "./ChatHistory/HistoricalMessage/Actions/RenderMetrics"; export default function ChatContainer({ workspace, knownHistory = [] }) { const { threadSlug = null } = useParams(); @@ -268,14 +269,16 @@ export default function ChatContainer({ workspace, knownHistory = [] }) { > {isMobile && } - 0} - /> + + 0} + /> + chat.uuid === uuid); if (chatIdx !== -1) { const existingHistory = { ..._chatHistory[chatIdx] }; - const updatedHistory = { - ...existingHistory, - content: existingHistory.content + textResponse, - sources, - error, - closed: close, - animate: !close, - pending: false, - chatId, - }; + let updatedHistory; + + // If the response is finalized, we can set the loading state to false. + // and append the metrics to the history. + if (type === "finalizeResponseStream") { + updatedHistory = { + ...existingHistory, + closed: close, + animate: !close, + pending: false, + chatId, + metrics, + }; + setLoadingResponse(false); + } else { + updatedHistory = { + ...existingHistory, + content: existingHistory.content + textResponse, + sources, + error, + closed: close, + animate: !close, + pending: false, + chatId, + metrics, + }; + } _chatHistory[chatIdx] = updatedHistory; } else { _chatHistory.push({ @@ -101,6 +126,7 @@ export default function handleChat( animate: !close, pending: false, chatId, + metrics, }); } setChatHistory([..._chatHistory]); @@ -125,6 +151,7 @@ export default function handleChat( error: null, animate: false, pending: false, + metrics, }; _chatHistory[chatIdx] = updatedHistory; diff --git a/server/.env.example b/server/.env.example index fb84a9f8d2..3346fc397d 100644 --- a/server/.env.example +++ b/server/.env.example @@ -52,6 +52,10 @@ SIG_SALT='salt' # Please generate random string at least 32 chars long. # PERPLEXITY_API_KEY='my-perplexity-key' # PERPLEXITY_MODEL_PREF='codellama-34b-instruct' +# LLM_PROVIDER='deepseek' +# DEEPSEEK_API_KEY=YOUR_API_KEY +# DEEPSEEK_MODEL_PREF='deepseek-chat' + # LLM_PROVIDER='openrouter' # OPENROUTER_API_KEY='my-openrouter-key' # OPENROUTER_MODEL_PREF='openrouter/auto' diff --git a/server/endpoints/api/admin/index.js b/server/endpoints/api/admin/index.js index 18f59ee872..93fbff7669 100644 --- a/server/endpoints/api/admin/index.js +++ b/server/endpoints/api/admin/index.js @@ -610,24 +610,20 @@ function apiAdminEndpoints(app) { const workspaceUsers = await Workspace.workspaceUsers(workspace.id); if (!workspace) { - response - .status(404) - .json({ - success: false, - error: `Workspace ${workspaceSlug} not found`, - users: workspaceUsers, - }); + response.status(404).json({ + success: false, + error: `Workspace ${workspaceSlug} not found`, + users: workspaceUsers, + }); return; } if (userIds.length === 0) { - response - .status(404) - .json({ - success: false, - error: `No valid user IDs provided.`, - users: workspaceUsers, - }); + response.status(404).json({ + success: false, + error: `No valid user IDs provided.`, + users: workspaceUsers, + }); return; } @@ -637,13 +633,11 @@ function apiAdminEndpoints(app) { workspace.id, userIds ); - return response - .status(200) - .json({ - success, - error, - users: await Workspace.workspaceUsers(workspace.id), - }); + return response.status(200).json({ + success, + error, + users: await Workspace.workspaceUsers(workspace.id), + }); } // Add new users to the workspace if they are not already in the workspace @@ -653,13 +647,11 @@ function apiAdminEndpoints(app) { ); if (usersToAdd.length > 0) await WorkspaceUser.createManyUsers(usersToAdd, workspace.id); - response - .status(200) - .json({ - success: true, - error: null, - users: await Workspace.workspaceUsers(workspace.id), - }); + response.status(200).json({ + success: true, + error: null, + users: await Workspace.workspaceUsers(workspace.id), + }); } catch (e) { console.error(e); response.sendStatus(500).end(); diff --git a/server/utils/AiProviders/anthropic/index.js b/server/utils/AiProviders/anthropic/index.js index 7b752da305..f9c4c91c7a 100644 --- a/server/utils/AiProviders/anthropic/index.js +++ b/server/utils/AiProviders/anthropic/index.js @@ -5,6 +5,9 @@ const { } = require("../../helpers/chat/responses"); const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { MODEL_MAP } = require("../modelMap"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); class AnthropicLLM { constructor(embedder = null, modelPreference = null) { @@ -111,18 +114,31 @@ class AnthropicLLM { ); try { - const response = await this.anthropic.messages.create({ - model: this.model, - max_tokens: 4096, - system: messages[0].content, // Strip out the system message - messages: messages.slice(1), // Pop off the system message - temperature: Number(temperature ?? this.defaultTemp), - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.anthropic.messages.create({ + model: this.model, + max_tokens: 4096, + system: messages[0].content, // Strip out the system message + messages: messages.slice(1), // Pop off the system message + temperature: Number(temperature ?? this.defaultTemp), + }) + ); - return response.content[0].text; + const promptTokens = result.output.usage.input_tokens; + const completionTokens = result.output.usage.output_tokens; + return { + textResponse: result.output.content[0].text, + metrics: { + prompt_tokens: promptTokens, + completion_tokens: completionTokens, + total_tokens: promptTokens + completionTokens, + outputTps: completionTokens / result.duration, + duration: result.duration, + }, + }; } catch (error) { console.log(error); - return error; + return { textResponse: error, metrics: {} }; } } @@ -132,26 +148,45 @@ class AnthropicLLM { `Anthropic chat: ${this.model} is not valid for chat completion!` ); - const streamRequest = await this.anthropic.messages.stream({ - model: this.model, - max_tokens: 4096, - system: messages[0].content, // Strip out the system message - messages: messages.slice(1), // Pop off the system message - temperature: Number(temperature ?? this.defaultTemp), - }); - return streamRequest; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.anthropic.messages.stream({ + model: this.model, + max_tokens: 4096, + system: messages[0].content, // Strip out the system message + messages: messages.slice(1), // Pop off the system message + temperature: Number(temperature ?? this.defaultTemp), + }), + messages, + false + ); + + return measuredStreamRequest; } + /** + * Handles the stream response from the Anthropic API. + * @param {Object} response - the response object + * @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream - the stream response from the Anthropic API w/tracking + * @param {Object} responseProps - the response properties + * @returns {Promise} + */ handleStream(response, stream, responseProps) { return new Promise((resolve) => { let fullText = ""; const { uuid = v4(), sources = [] } = responseProps; + let usage = { + prompt_tokens: 0, + completion_tokens: 0, + }; // Establish listener to early-abort a streaming response // in case things go sideways or the user does not like the response. // We preserve the generated text but continue as if chat was completed // to preserve previously generated content. - const handleAbort = () => clientAbortedHandler(resolve, fullText); + const handleAbort = () => { + stream?.endMeasurement(usage); + clientAbortedHandler(resolve, fullText); + }; response.on("close", handleAbort); stream.on("error", (event) => { @@ -173,11 +208,18 @@ class AnthropicLLM { error: parseErrorMsg(event), }); response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); resolve(fullText); }); stream.on("streamEvent", (message) => { const data = message; + + if (data.type === "message_start") + usage.prompt_tokens = data?.message?.usage?.input_tokens; + if (data.type === "message_delta") + usage.completion_tokens = data?.usage?.output_tokens; + if ( data.type === "content_block_delta" && data.delta.type === "text_delta" @@ -208,6 +250,7 @@ class AnthropicLLM { error: false, }); response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); resolve(fullText); } }); diff --git a/server/utils/AiProviders/apipie/index.js b/server/utils/AiProviders/apipie/index.js index 47b3aabc85..1f6dd68a0f 100644 --- a/server/utils/AiProviders/apipie/index.js +++ b/server/utils/AiProviders/apipie/index.js @@ -4,10 +4,13 @@ const { writeResponseChunk, clientAbortedHandler, } = require("../../helpers/chat/responses"); - const fs = require("fs"); const path = require("path"); const { safeJsonParse } = require("../../http"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); + const cacheFolder = path.resolve( process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, "models", "apipie") @@ -188,19 +191,35 @@ class ApiPieLLM { `ApiPie chat: ${this.model} is not valid for chat completion!` ); - const result = await this.openai.chat.completions - .create({ - model: this.model, - messages, - temperature, - }) - .catch((e) => { - throw new Error(e.message); - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature, + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage?.prompt_tokens || 0, + completion_tokens: result.output.usage?.completion_tokens || 0, + total_tokens: result.output.usage?.total_tokens || 0, + outputTps: + (result.output.usage?.completion_tokens || 0) / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -209,13 +228,16 @@ class ApiPieLLM { `ApiPie chat: ${this.model} is not valid for chat completion!` ); - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, - messages, - temperature, - }); - return streamRequest; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), + messages + ); + return measuredStreamRequest; } handleStream(response, stream, responseProps) { @@ -228,7 +250,12 @@ class ApiPieLLM { // in case things go sideways or the user does not like the response. // We preserve the generated text but continue as if chat was completed // to preserve previously generated content. - const handleAbort = () => clientAbortedHandler(resolve, fullText); + const handleAbort = () => { + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); + clientAbortedHandler(resolve, fullText); + }; response.on("close", handleAbort); try { @@ -258,6 +285,9 @@ class ApiPieLLM { error: false, }); response.removeListener("close", handleAbort); + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); resolve(fullText); } } @@ -271,6 +301,9 @@ class ApiPieLLM { error: e.message, }); response.removeListener("close", handleAbort); + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); resolve(fullText); } }); diff --git a/server/utils/AiProviders/azureOpenAi/index.js b/server/utils/AiProviders/azureOpenAi/index.js index 98c6d51530..078f55eef0 100644 --- a/server/utils/AiProviders/azureOpenAi/index.js +++ b/server/utils/AiProviders/azureOpenAi/index.js @@ -1,4 +1,7 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const { writeResponseChunk, clientAbortedHandler, @@ -114,11 +117,28 @@ class AzureOpenAiLLM { "No OPEN_MODEL_PREF ENV defined. This must the name of a deployment on your Azure account for an LLM chat model like GPT-3.5." ); - const data = await this.openai.getChatCompletions(this.model, messages, { - temperature, - }); - if (!data.hasOwnProperty("choices")) return null; - return data.choices[0].message.content; + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.getChatCompletions(this.model, messages, { + temperature, + }) + ); + + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) + return null; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage.promptTokens || 0, + completion_tokens: result.output.usage.completionTokens || 0, + total_tokens: result.output.usage.totalTokens || 0, + outputTps: result.output.usage.completionTokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = [], { temperature = 0.7 }) { @@ -127,28 +147,43 @@ class AzureOpenAiLLM { "No OPEN_MODEL_PREF ENV defined. This must the name of a deployment on your Azure account for an LLM chat model like GPT-3.5." ); - const stream = await this.openai.streamChatCompletions( - this.model, - messages, - { + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + await this.openai.streamChatCompletions(this.model, messages, { temperature, n: 1, - } + }), + messages ); - return stream; + + return measuredStreamRequest; } + /** + * Handles the stream response from the AzureOpenAI API. + * Azure does not return the usage metrics in the stream response, but 1msg = 1token + * so we can estimate the completion tokens by counting the number of messages. + * @param {Object} response - the response object + * @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream - the stream response from the AzureOpenAI API w/tracking + * @param {Object} responseProps - the response properties + * @returns {Promise} + */ handleStream(response, stream, responseProps) { const { uuid = uuidv4(), sources = [] } = responseProps; return new Promise(async (resolve) => { let fullText = ""; + let usage = { + completion_tokens: 0, + }; // Establish listener to early-abort a streaming response // in case things go sideways or the user does not like the response. // We preserve the generated text but continue as if chat was completed // to preserve previously generated content. - const handleAbort = () => clientAbortedHandler(resolve, fullText); + const handleAbort = () => { + stream?.endMeasurement(usage); + clientAbortedHandler(resolve, fullText); + }; response.on("close", handleAbort); for await (const event of stream) { @@ -156,6 +191,8 @@ class AzureOpenAiLLM { const delta = choice.delta?.content; if (!delta) continue; fullText += delta; + usage.completion_tokens++; + writeResponseChunk(response, { uuid, sources: [], @@ -176,6 +213,7 @@ class AzureOpenAiLLM { error: false, }); response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); resolve(fullText); }); } diff --git a/server/utils/AiProviders/bedrock/index.js b/server/utils/AiProviders/bedrock/index.js index 8d9be2d392..171d7b4595 100644 --- a/server/utils/AiProviders/bedrock/index.js +++ b/server/utils/AiProviders/bedrock/index.js @@ -4,6 +4,9 @@ const { clientAbortedHandler, } = require("../../helpers/chat/responses"); const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); // Docs: https://js.langchain.com/v0.2/docs/integrations/chat/bedrock_converse class AWSBedrockLLM { @@ -82,7 +85,7 @@ class AWSBedrockLLM { } // For streaming we use Langchain's wrapper to handle weird chunks - // or otherwise absorb headaches that can arise from Ollama models + // or otherwise absorb headaches that can arise from Bedrock models #convertToLangchainPrototypes(chats = []) { const { HumanMessage, @@ -219,40 +222,73 @@ class AWSBedrockLLM { async getChatCompletion(messages = null, { temperature = 0.7 }) { const model = this.#bedrockClient({ temperature }); - const textResponse = await model - .pipe(new StringOutputParser()) - .invoke(this.#convertToLangchainPrototypes(messages)) - .catch((e) => { - throw new Error( - `AWSBedrock::getChatCompletion failed to communicate with Ollama. ${e.message}` - ); - }); - - if (!textResponse || !textResponse.length) - throw new Error(`AWSBedrock::getChatCompletion text response was empty.`); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + model + .pipe(new StringOutputParser()) + .invoke(this.#convertToLangchainPrototypes(messages)) + .catch((e) => { + throw new Error( + `AWSBedrock::getChatCompletion failed to communicate with Bedrock client. ${e.message}` + ); + }) + ); - return textResponse; + if (!result.output || result.output.length === 0) return null; + + // Langchain does not return the usage metrics in the response so we estimate them + const promptTokens = LLMPerformanceMonitor.countTokens(messages); + const completionTokens = LLMPerformanceMonitor.countTokens([ + { content: result.output }, + ]); + + return { + textResponse: result.output, + metrics: { + prompt_tokens: promptTokens, + completion_tokens: completionTokens, + total_tokens: promptTokens + completionTokens, + outputTps: completionTokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { const model = this.#bedrockClient({ temperature }); - const stream = await model - .pipe(new StringOutputParser()) - .stream(this.#convertToLangchainPrototypes(messages)); - return stream; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + model + .pipe(new StringOutputParser()) + .stream(this.#convertToLangchainPrototypes(messages)), + messages + ); + return measuredStreamRequest; } + /** + * Handles the stream response from the AWS Bedrock API. + * Bedrock does not support usage metrics in the stream response so we need to estimate them. + * @param {Object} response - the response object + * @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream - the stream response from the AWS Bedrock API w/tracking + * @param {Object} responseProps - the response properties + * @returns {Promise} + */ handleStream(response, stream, responseProps) { const { uuid = uuidv4(), sources = [] } = responseProps; return new Promise(async (resolve) => { let fullText = ""; + let usage = { + completion_tokens: 0, + }; // Establish listener to early-abort a streaming response // in case things go sideways or the user does not like the response. // We preserve the generated text but continue as if chat was completed // to preserve previously generated content. - const handleAbort = () => clientAbortedHandler(resolve, fullText); + const handleAbort = () => { + stream?.endMeasurement(usage); + clientAbortedHandler(resolve, fullText); + }; response.on("close", handleAbort); try { @@ -266,6 +302,7 @@ class AWSBedrockLLM { ? chunk.content : chunk; fullText += content; + if (!!content) usage.completion_tokens++; // Dont count empty chunks writeResponseChunk(response, { uuid, sources: [], @@ -285,6 +322,7 @@ class AWSBedrockLLM { error: false, }); response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); resolve(fullText); } catch (error) { writeResponseChunk(response, { @@ -298,6 +336,7 @@ class AWSBedrockLLM { }`, }); response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); } }); } diff --git a/server/utils/AiProviders/cohere/index.js b/server/utils/AiProviders/cohere/index.js index f61d43f623..33b65df21c 100644 --- a/server/utils/AiProviders/cohere/index.js +++ b/server/utils/AiProviders/cohere/index.js @@ -2,6 +2,9 @@ const { v4 } = require("uuid"); const { writeResponseChunk } = require("../../helpers/chat/responses"); const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { MODEL_MAP } = require("../modelMap"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); class CohereLLM { constructor(embedder = null) { @@ -101,15 +104,33 @@ class CohereLLM { const message = messages[messages.length - 1].content; // Get the last message const cohereHistory = this.#convertChatHistoryCohere(messages.slice(0, -1)); // Remove the last message and convert to Cohere - const chat = await this.cohere.chat({ - model: this.model, - message: message, - chatHistory: cohereHistory, - temperature, - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.cohere.chat({ + model: this.model, + message: message, + chatHistory: cohereHistory, + temperature, + }) + ); + + if ( + !result.output.hasOwnProperty("text") || + result.output.text.length === 0 + ) + return null; - if (!chat.hasOwnProperty("text")) return null; - return chat.text; + const promptTokens = result.output.meta?.tokens?.inputTokens || 0; + const completionTokens = result.output.meta?.tokens?.outputTokens || 0; + return { + textResponse: result.output.text, + metrics: { + prompt_tokens: promptTokens, + completion_tokens: completionTokens, + total_tokens: promptTokens + completionTokens, + outputTps: completionTokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -120,21 +141,35 @@ class CohereLLM { const message = messages[messages.length - 1].content; // Get the last message const cohereHistory = this.#convertChatHistoryCohere(messages.slice(0, -1)); // Remove the last message and convert to Cohere + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.cohere.chatStream({ + model: this.model, + message: message, + chatHistory: cohereHistory, + temperature, + }), + messages, + false + ); - const stream = await this.cohere.chatStream({ - model: this.model, - message: message, - chatHistory: cohereHistory, - temperature, - }); - - return { type: "stream", stream: stream }; + return measuredStreamRequest; } + /** + * Handles the stream response from the Cohere API. + * @param {Object} response - the response object + * @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream - the stream response from the Cohere API w/tracking + * @param {Object} responseProps - the response properties + * @returns {Promise} + */ async handleStream(response, stream, responseProps) { return new Promise(async (resolve) => { - let fullText = ""; const { uuid = v4(), sources = [] } = responseProps; + let fullText = ""; + let usage = { + prompt_tokens: 0, + completion_tokens: 0, + }; const handleAbort = () => { writeResponseChunk(response, { @@ -146,12 +181,19 @@ class CohereLLM { error: false, }); response.removeListener("close", handleAbort); + stream.endMeasurement(usage); resolve(fullText); }; response.on("close", handleAbort); try { - for await (const chat of stream.stream) { + for await (const chat of stream) { + if (chat.eventType === "stream-end") { + const usageMetrics = chat?.response?.meta?.tokens || {}; + usage.prompt_tokens = usageMetrics.inputTokens || 0; + usage.completion_tokens = usageMetrics.outputTokens || 0; + } + if (chat.eventType === "text-generation") { const text = chat.text; fullText += text; @@ -176,6 +218,7 @@ class CohereLLM { error: false, }); response.removeListener("close", handleAbort); + stream.endMeasurement(usage); resolve(fullText); } catch (error) { writeResponseChunk(response, { @@ -187,6 +230,7 @@ class CohereLLM { error: error.message, }); response.removeListener("close", handleAbort); + stream.endMeasurement(usage); resolve(fullText); } }); diff --git a/server/utils/AiProviders/deepseek/index.js b/server/utils/AiProviders/deepseek/index.js index 5ef4c9a1c0..7bc804bbb6 100644 --- a/server/utils/AiProviders/deepseek/index.js +++ b/server/utils/AiProviders/deepseek/index.js @@ -1,4 +1,7 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); @@ -74,19 +77,34 @@ class DeepSeekLLM { `DeepSeek chat: ${this.model} is not valid for chat completion!` ); - const result = await this.openai.chat.completions - .create({ - model: this.model, - messages, - temperature, - }) - .catch((e) => { - throw new Error(e.message); - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature, + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage.prompt_tokens || 0, + completion_tokens: result.output.usage.completion_tokens || 0, + total_tokens: result.output.usage.total_tokens || 0, + outputTps: result.output.usage.completion_tokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -95,13 +113,18 @@ class DeepSeekLLM { `DeepSeek chat: ${this.model} is not valid for chat completion!` ); - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), messages, - temperature, - }); - return streamRequest; + false + ); + + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/AiProviders/fireworksAi/index.js b/server/utils/AiProviders/fireworksAi/index.js index 2332965fcc..97e5a07901 100644 --- a/server/utils/AiProviders/fireworksAi/index.js +++ b/server/utils/AiProviders/fireworksAi/index.js @@ -1,4 +1,7 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); @@ -84,15 +87,30 @@ class FireworksAiLLM { `FireworksAI chat: ${this.model} is not valid for chat completion!` ); - const result = await this.openai.chat.completions.create({ - model: this.model, - messages, - temperature, - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions.create({ + model: this.model, + messages, + temperature, + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage.prompt_tokens || 0, + completion_tokens: result.output.usage.completion_tokens || 0, + total_tokens: result.output.usage.total_tokens || 0, + outputTps: result.output.usage.completion_tokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -101,13 +119,17 @@ class FireworksAiLLM { `FireworksAI chat: ${this.model} is not valid for chat completion!` ); - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), messages, - temperature, - }); - return streamRequest; + false + ); + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/AiProviders/gemini/index.js b/server/utils/AiProviders/gemini/index.js index 28349f492b..f658b3c5f1 100644 --- a/server/utils/AiProviders/gemini/index.js +++ b/server/utils/AiProviders/gemini/index.js @@ -1,4 +1,7 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const { writeResponseChunk, clientAbortedHandler, @@ -227,13 +230,29 @@ class GeminiLLM { history: this.formatMessages(messages), safetySettings: this.#safetySettings(), }); - const result = await chatThread.sendMessage(prompt); - const response = result.response; - const responseText = response.text(); + const { output: result, duration } = + await LLMPerformanceMonitor.measureAsyncFunction( + chatThread.sendMessage(prompt) + ); + const responseText = result.response.text(); if (!responseText) throw new Error("Gemini: No response could be parsed."); - return responseText; + const promptTokens = LLMPerformanceMonitor.countTokens(messages); + const completionTokens = LLMPerformanceMonitor.countTokens([ + { content: responseText }, + ]); + + return { + textResponse: responseText, + metrics: { + prompt_tokens: promptTokens, + completion_tokens: completionTokens, + total_tokens: promptTokens + completionTokens, + outputTps: (promptTokens + completionTokens) / duration, + duration, + }, + }; } async streamGetChatCompletion(messages = [], _opts = {}) { @@ -249,11 +268,14 @@ class GeminiLLM { history: this.formatMessages(messages), safetySettings: this.#safetySettings(), }); - const responseStream = await chatThread.sendMessageStream(prompt); - if (!responseStream.stream) - throw new Error("Could not stream response stream from Gemini."); + const responseStream = await LLMPerformanceMonitor.measureStream( + (await chatThread.sendMessageStream(prompt)).stream, + messages + ); - return responseStream.stream; + if (!responseStream) + throw new Error("Could not stream response stream from Gemini."); + return responseStream; } async compressMessages(promptArgs = {}, rawHistory = []) { @@ -264,6 +286,10 @@ class GeminiLLM { handleStream(response, stream, responseProps) { const { uuid = uuidv4(), sources = [] } = responseProps; + // Usage is not available for Gemini streams + // so we need to calculate the completion tokens manually + // because 1 chunk != 1 token in gemini responses and it buffers + // many tokens before sending them to the client as a "chunk" return new Promise(async (resolve) => { let fullText = ""; @@ -272,7 +298,14 @@ class GeminiLLM { // in case things go sideways or the user does not like the response. // We preserve the generated text but continue as if chat was completed // to preserve previously generated content. - const handleAbort = () => clientAbortedHandler(resolve, fullText); + const handleAbort = () => { + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens([ + { content: fullText }, + ]), + }); + clientAbortedHandler(resolve, fullText); + }; response.on("close", handleAbort); for await (const chunk of stream) { @@ -292,6 +325,7 @@ class GeminiLLM { close: true, error: e.message, }); + stream?.endMeasurement({ completion_tokens: 0 }); resolve(e.message); return; } @@ -316,6 +350,11 @@ class GeminiLLM { error: false, }); response.removeListener("close", handleAbort); + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens([ + { content: fullText }, + ]), + }); resolve(fullText); }); } diff --git a/server/utils/AiProviders/genericOpenAi/index.js b/server/utils/AiProviders/genericOpenAi/index.js index 8d17aa257c..57c8f6a144 100644 --- a/server/utils/AiProviders/genericOpenAi/index.js +++ b/server/utils/AiProviders/genericOpenAi/index.js @@ -1,4 +1,7 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); @@ -139,31 +142,52 @@ class GenericOpenAiLLM { } async getChatCompletion(messages = null, { temperature = 0.7 }) { - const result = await this.openai.chat.completions - .create({ - model: this.model, - messages, - temperature, - max_tokens: this.maxTokens, - }) - .catch((e) => { - throw new Error(e.message); - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature, + max_tokens: this.maxTokens, + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output?.usage?.prompt_tokens || 0, + completion_tokens: result.output?.usage?.completion_tokens || 0, + total_tokens: result.output?.usage?.total_tokens || 0, + outputTps: + (result.output?.usage?.completion_tokens || 0) / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, - messages, - temperature, - max_tokens: this.maxTokens, - }); - return streamRequest; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + max_tokens: this.maxTokens, + }), + messages + // runPromptTokenCalculation: true - There is not way to know if the generic provider connected is returning + // the correct usage metrics if any at all since any provider could be connected. + ); + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/AiProviders/groq/index.js b/server/utils/AiProviders/groq/index.js index d928e5e0d1..5793002f6c 100644 --- a/server/utils/AiProviders/groq/index.js +++ b/server/utils/AiProviders/groq/index.js @@ -1,4 +1,7 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); @@ -170,19 +173,36 @@ class GroqLLM { `GroqAI:chatCompletion: ${this.model} is not valid for chat completion!` ); - const result = await this.openai.chat.completions - .create({ - model: this.model, - messages, - temperature, - }) - .catch((e) => { - throw new Error(e.message); - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature, + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage.prompt_tokens || 0, + completion_tokens: result.output.usage.completion_tokens || 0, + total_tokens: result.output.usage.total_tokens || 0, + outputTps: + result.output.usage.completion_tokens / + result.output.usage.completion_time, + duration: result.output.usage.total_time, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -191,13 +211,18 @@ class GroqLLM { `GroqAI:streamChatCompletion: ${this.model} is not valid for chat completion!` ); - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), messages, - temperature, - }); - return streamRequest; + false + ); + + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/AiProviders/huggingface/index.js b/server/utils/AiProviders/huggingface/index.js index 021a636b3d..f4b6100e0d 100644 --- a/server/utils/AiProviders/huggingface/index.js +++ b/server/utils/AiProviders/huggingface/index.js @@ -1,4 +1,7 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); @@ -87,25 +90,48 @@ class HuggingFaceLLM { } async getChatCompletion(messages = null, { temperature = 0.7 }) { - const result = await this.openai.chat.completions.create({ - model: this.model, - messages, - temperature, - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature, + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage?.prompt_tokens || 0, + completion_tokens: result.output.usage?.completion_tokens || 0, + total_tokens: result.output.usage?.total_tokens || 0, + outputTps: + (result.output.usage?.completion_tokens || 0) / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, - messages, - temperature, - }); - return streamRequest; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), + messages + ); + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/AiProviders/koboldCPP/index.js b/server/utils/AiProviders/koboldCPP/index.js index 9a700793da..0e5206cabe 100644 --- a/server/utils/AiProviders/koboldCPP/index.js +++ b/server/utils/AiProviders/koboldCPP/index.js @@ -3,6 +3,9 @@ const { clientAbortedHandler, writeResponseChunk, } = require("../../helpers/chat/responses"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const { v4: uuidv4 } = require("uuid"); class KoboldCPPLLM { @@ -122,38 +125,71 @@ class KoboldCPPLLM { } async getChatCompletion(messages = null, { temperature = 0.7 }) { - const result = await this.openai.chat.completions - .create({ - model: this.model, - messages, - temperature, - }) - .catch((e) => { - throw new Error(e.message); - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature, + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + const promptTokens = LLMPerformanceMonitor.countTokens(messages); + const completionTokens = LLMPerformanceMonitor.countTokens([ + { content: result.output.choices[0].message.content }, + ]); + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: promptTokens, + completion_tokens: completionTokens, + total_tokens: promptTokens + completionTokens, + outputTps: completionTokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, - messages, - temperature, - }); - return streamRequest; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), + messages + ); + return measuredStreamRequest; } handleStream(response, stream, responseProps) { const { uuid = uuidv4(), sources = [] } = responseProps; - // Custom handler for KoboldCPP stream responses return new Promise(async (resolve) => { let fullText = ""; - const handleAbort = () => clientAbortedHandler(resolve, fullText); + let usage = { + prompt_tokens: LLMPerformanceMonitor.countTokens(stream.messages || []), + completion_tokens: 0, + }; + + const handleAbort = () => { + usage.completion_tokens = LLMPerformanceMonitor.countTokens([ + { content: fullText }, + ]); + stream?.endMeasurement(usage); + clientAbortedHandler(resolve, fullText); + }; response.on("close", handleAbort); for await (const chunk of stream) { @@ -187,6 +223,10 @@ class KoboldCPPLLM { error: false, }); response.removeListener("close", handleAbort); + usage.completion_tokens = LLMPerformanceMonitor.countTokens([ + { content: fullText }, + ]); + stream?.endMeasurement(usage); resolve(fullText); } } diff --git a/server/utils/AiProviders/liteLLM/index.js b/server/utils/AiProviders/liteLLM/index.js index d8907e7a90..63f4115bc9 100644 --- a/server/utils/AiProviders/liteLLM/index.js +++ b/server/utils/AiProviders/liteLLM/index.js @@ -1,4 +1,7 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); @@ -121,31 +124,53 @@ class LiteLLM { } async getChatCompletion(messages = null, { temperature = 0.7 }) { - const result = await this.openai.chat.completions - .create({ - model: this.model, - messages, - temperature, - max_tokens: parseInt(this.maxTokens), // LiteLLM requires int - }) - .catch((e) => { - throw new Error(e.message); - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature, + max_tokens: parseInt(this.maxTokens), // LiteLLM requires int + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage?.prompt_tokens || 0, + completion_tokens: result.output.usage?.completion_tokens || 0, + total_tokens: result.output.usage?.total_tokens || 0, + outputTps: + (result.output.usage?.completion_tokens || 0) / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, - messages, - temperature, - max_tokens: parseInt(this.maxTokens), // LiteLLM requires int - }); - return streamRequest; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + max_tokens: parseInt(this.maxTokens), // LiteLLM requires int + }), + messages + // runPromptTokenCalculation: true - We manually count the tokens because they may or may not be provided in the stream + // responses depending on LLM connected. If they are provided, then we counted for nothing, but better than nothing. + ); + + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/AiProviders/lmStudio/index.js b/server/utils/AiProviders/lmStudio/index.js index 6a30ef7428..082576c98a 100644 --- a/server/utils/AiProviders/lmStudio/index.js +++ b/server/utils/AiProviders/lmStudio/index.js @@ -2,6 +2,9 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); // hybrid of openAi LLM chat completion for LMStudio class LMStudioLLM { @@ -128,15 +131,30 @@ class LMStudioLLM { `LMStudio chat: ${this.model} is not valid or defined model for chat completion!` ); - const result = await this.lmstudio.chat.completions.create({ - model: this.model, - messages, - temperature, - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.lmstudio.chat.completions.create({ + model: this.model, + messages, + temperature, + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage?.prompt_tokens || 0, + completion_tokens: result.output.usage?.completion_tokens || 0, + total_tokens: result.output.usage?.total_tokens || 0, + outputTps: result.output.usage?.completion_tokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -145,13 +163,16 @@ class LMStudioLLM { `LMStudio chat: ${this.model} is not valid or defined model for chat completion!` ); - const streamRequest = await this.lmstudio.chat.completions.create({ - model: this.model, - stream: true, - messages, - temperature, - }); - return streamRequest; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.lmstudio.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), + messages + ); + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/AiProviders/localAi/index.js b/server/utils/AiProviders/localAi/index.js index 2d5e8b1f45..53da280f2d 100644 --- a/server/utils/AiProviders/localAi/index.js +++ b/server/utils/AiProviders/localAi/index.js @@ -1,4 +1,7 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); @@ -114,15 +117,35 @@ class LocalAiLLM { `LocalAI chat: ${this.model} is not valid for chat completion!` ); - const result = await this.openai.chat.completions.create({ - model: this.model, - messages, - temperature, - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions.create({ + model: this.model, + messages, + temperature, + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + const promptTokens = LLMPerformanceMonitor.countTokens(messages); + const completionTokens = LLMPerformanceMonitor.countTokens( + result.output.choices[0].message.content + ); + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: promptTokens, + completion_tokens: completionTokens, + total_tokens: promptTokens + completionTokens, + outputTps: completionTokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -131,13 +154,16 @@ class LocalAiLLM { `LocalAi chat: ${this.model} is not valid for chat completion!` ); - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, - messages, - temperature, - }); - return streamRequest; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), + messages + ); + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/AiProviders/mistral/index.js b/server/utils/AiProviders/mistral/index.js index 8a655da323..219f6f52f9 100644 --- a/server/utils/AiProviders/mistral/index.js +++ b/server/utils/AiProviders/mistral/index.js @@ -1,4 +1,7 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); @@ -103,15 +106,34 @@ class MistralLLM { `Mistral chat: ${this.model} is not valid for chat completion!` ); - const result = await this.openai.chat.completions.create({ - model: this.model, - messages, - temperature, - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature, + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage.prompt_tokens || 0, + completion_tokens: result.output.usage.completion_tokens || 0, + total_tokens: result.output.usage.total_tokens || 0, + outputTps: result.output.usage.completion_tokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -120,13 +142,17 @@ class MistralLLM { `Mistral chat: ${this.model} is not valid for chat completion!` ); - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), messages, - temperature, - }); - return streamRequest; + false + ); + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/AiProviders/native/index.js b/server/utils/AiProviders/native/index.js index 4d15cdac06..b7c0596cad 100644 --- a/server/utils/AiProviders/native/index.js +++ b/server/utils/AiProviders/native/index.js @@ -5,6 +5,9 @@ const { writeResponseChunk, clientAbortedHandler, } = require("../../helpers/chat/responses"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); // Docs: https://js.langchain.com/docs/integrations/chat/llama_cpp const ChatLlamaCpp = (...args) => @@ -126,16 +129,44 @@ class NativeLLM { async getChatCompletion(messages = null, { temperature = 0.7 }) { const model = await this.#llamaClient({ temperature }); - const response = await model.call(messages); - return response.content; + const result = await LLMPerformanceMonitor.measureAsyncFunction( + model.call(messages) + ); + + if (!result.output?.content) return null; + + const promptTokens = LLMPerformanceMonitor.countTokens(messages); + const completionTokens = LLMPerformanceMonitor.countTokens( + result.output.content + ); + return { + textResponse: result.output.content, + metrics: { + prompt_tokens: promptTokens, + completion_tokens: completionTokens, + total_tokens: promptTokens + completionTokens, + outputTps: completionTokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { const model = await this.#llamaClient({ temperature }); - const responseStream = await model.stream(messages); - return responseStream; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + model.stream(messages), + messages + ); + return measuredStreamRequest; } + /** + * Handles the default stream response for a chat. + * @param {import("express").Response} response + * @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream + * @param {Object} responseProps + * @returns {Promise} + */ handleStream(response, stream, responseProps) { const { uuid = uuidv4(), sources = [] } = responseProps; @@ -146,7 +177,12 @@ class NativeLLM { // in case things go sideways or the user does not like the response. // We preserve the generated text but continue as if chat was completed // to preserve previously generated content. - const handleAbort = () => clientAbortedHandler(resolve, fullText); + const handleAbort = () => { + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); + clientAbortedHandler(resolve, fullText); + }; response.on("close", handleAbort); for await (const chunk of stream) { @@ -176,6 +212,9 @@ class NativeLLM { error: false, }); response.removeListener("close", handleAbort); + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); resolve(fullText); }); } diff --git a/server/utils/AiProviders/novita/index.js b/server/utils/AiProviders/novita/index.js index f15d20d41f..c41f5a6667 100644 --- a/server/utils/AiProviders/novita/index.js +++ b/server/utils/AiProviders/novita/index.js @@ -7,6 +7,9 @@ const { const fs = require("fs"); const path = require("path"); const { safeJsonParse } = require("../../http"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const cacheFolder = path.resolve( process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, "models", "novita") @@ -188,19 +191,34 @@ class NovitaLLM { `Novita chat: ${this.model} is not valid for chat completion!` ); - const result = await this.openai.chat.completions - .create({ - model: this.model, - messages, - temperature, - }) - .catch((e) => { - throw new Error(e.message); - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature, + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage.prompt_tokens || 0, + completion_tokens: result.output.usage.completion_tokens || 0, + total_tokens: result.output.usage.total_tokens || 0, + outputTps: result.output.usage.completion_tokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -209,15 +227,25 @@ class NovitaLLM { `Novita chat: ${this.model} is not valid for chat completion!` ); - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, - messages, - temperature, - }); - return streamRequest; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), + messages + ); + return measuredStreamRequest; } + /** + * Handles the default stream response for a chat. + * @param {import("express").Response} response + * @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream + * @param {Object} responseProps + * @returns {Promise} + */ handleStream(response, stream, responseProps) { const timeoutThresholdMs = this.timeout; const { uuid = uuidv4(), sources = [] } = responseProps; @@ -230,7 +258,12 @@ class NovitaLLM { // in case things go sideways or the user does not like the response. // We preserve the generated text but continue as if chat was completed // to preserve previously generated content. - const handleAbort = () => clientAbortedHandler(resolve, fullText); + const handleAbort = () => { + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); + clientAbortedHandler(resolve, fullText); + }; response.on("close", handleAbort); // NOTICE: Not all Novita models will return a stop reason @@ -259,6 +292,9 @@ class NovitaLLM { }); clearInterval(timeoutCheck); response.removeListener("close", handleAbort); + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); resolve(fullText); } }, 500); @@ -291,6 +327,9 @@ class NovitaLLM { error: false, }); response.removeListener("close", handleAbort); + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); resolve(fullText); } } @@ -304,6 +343,9 @@ class NovitaLLM { error: e.message, }); response.removeListener("close", handleAbort); + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); resolve(fullText); } }); diff --git a/server/utils/AiProviders/nvidiaNim/index.js b/server/utils/AiProviders/nvidiaNim/index.js index 554b0eec54..3cf7f835f1 100644 --- a/server/utils/AiProviders/nvidiaNim/index.js +++ b/server/utils/AiProviders/nvidiaNim/index.js @@ -1,4 +1,7 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); @@ -153,15 +156,34 @@ class NvidiaNimLLM { `Nvidia NIM chat: ${this.model} is not valid or defined model for chat completion!` ); - const result = await this.nvidiaNim.chat.completions.create({ - model: this.model, - messages, - temperature, - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.nvidiaNim.chat.completions + .create({ + model: this.model, + messages, + temperature, + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage.prompt_tokens || 0, + completion_tokens: result.output.usage.completion_tokens || 0, + total_tokens: result.output.usage.total_tokens || 0, + outputTps: result.output.usage.completion_tokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -170,13 +192,16 @@ class NvidiaNimLLM { `Nvidia NIM chat: ${this.model} is not valid or defined model for chat completion!` ); - const streamRequest = await this.nvidiaNim.chat.completions.create({ - model: this.model, - stream: true, - messages, - temperature, - }); - return streamRequest; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.nvidiaNim.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), + messages + ); + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/AiProviders/ollama/index.js b/server/utils/AiProviders/ollama/index.js index eb18ee6f33..b62c80929d 100644 --- a/server/utils/AiProviders/ollama/index.js +++ b/server/utils/AiProviders/ollama/index.js @@ -1,9 +1,12 @@ -const { StringOutputParser } = require("@langchain/core/output_parsers"); const { writeResponseChunk, clientAbortedHandler, } = require("../../helpers/chat/responses"); const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); +const { Ollama } = require("ollama"); // Docs: https://github.com/jmorganca/ollama/blob/main/docs/api.md class OllamaAILLM { @@ -23,49 +26,11 @@ class OllamaAILLM { user: this.promptWindowLimit() * 0.7, }; + this.client = new Ollama({ host: this.basePath }); this.embedder = embedder ?? new NativeEmbedder(); this.defaultTemp = 0.7; } - #ollamaClient({ temperature = 0.07 }) { - const { ChatOllama } = require("@langchain/community/chat_models/ollama"); - return new ChatOllama({ - baseUrl: this.basePath, - model: this.model, - keepAlive: this.keepAlive, - useMLock: true, - // There are currently only two performance settings so if its not "base" - its max context. - ...(this.performanceMode === "base" - ? {} - : { numCtx: this.promptWindowLimit() }), - temperature, - }); - } - - // For streaming we use Langchain's wrapper to handle weird chunks - // or otherwise absorb headaches that can arise from Ollama models - #convertToLangchainPrototypes(chats = []) { - const { - HumanMessage, - SystemMessage, - AIMessage, - } = require("@langchain/core/messages"); - const langchainChats = []; - const roleToMessageMap = { - system: SystemMessage, - user: HumanMessage, - assistant: AIMessage, - }; - - for (const chat of chats) { - if (!roleToMessageMap.hasOwnProperty(chat.role)) continue; - const MessageClass = roleToMessageMap[chat.role]; - langchainChats.push(new MessageClass({ content: chat.content })); - } - - return langchainChats; - } - #appendContext(contextTexts = []) { if (!contextTexts || !contextTexts.length) return ""; return ( @@ -105,21 +70,29 @@ class OllamaAILLM { /** * Generates appropriate content array for a message + attachments. * @param {{userPrompt:string, attachments: import("../../helpers").Attachment[]}} - * @returns {string|object[]} + * @returns {{content: string, images: string[]}} */ #generateContent({ userPrompt, attachments = [] }) { - if (!attachments.length) { - return { content: userPrompt }; - } + if (!attachments.length) return { content: userPrompt }; + const images = attachments.map( + (attachment) => attachment.contentString.split("base64,").slice(-1)[0] + ); + return { content: userPrompt, images }; + } - const content = [{ type: "text", text: userPrompt }]; - for (let attachment of attachments) { - content.push({ - type: "image_url", - image_url: attachment.contentString, - }); + /** + * Handles errors from the Ollama API to make them more user friendly. + * @param {Error} e + */ + #errorHandler(e) { + switch (e.message) { + case "fetch failed": + throw new Error( + "Your Ollama instance could not be reached or is not responding. Please make sure it is running the API server and your connection information is correct in AnythingLLM." + ); + default: + return e; } - return { content: content.flat() }; } /** @@ -149,41 +122,103 @@ class OllamaAILLM { } async getChatCompletion(messages = null, { temperature = 0.7 }) { - const model = this.#ollamaClient({ temperature }); - const textResponse = await model - .pipe(new StringOutputParser()) - .invoke(this.#convertToLangchainPrototypes(messages)) - .catch((e) => { - throw new Error( - `Ollama::getChatCompletion failed to communicate with Ollama. ${e.message}` - ); - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.client + .chat({ + model: this.model, + stream: false, + messages, + keep_alive: this.keepAlive, + options: { + temperature, + useMLock: true, + // There are currently only two performance settings so if its not "base" - its max context. + ...(this.performanceMode === "base" + ? {} + : { numCtx: this.promptWindowLimit() }), + }, + }) + .then((res) => { + return { + content: res.message.content, + usage: { + prompt_tokens: res.prompt_eval_count, + completion_tokens: res.eval_count, + total_tokens: res.prompt_eval_count + res.eval_count, + }, + }; + }) + .catch((e) => { + throw new Error( + `Ollama::getChatCompletion failed to communicate with Ollama. ${this.#errorHandler(e).message}` + ); + }) + ); - if (!textResponse || !textResponse.length) + if (!result.output.content || !result.output.content.length) throw new Error(`Ollama::getChatCompletion text response was empty.`); - return textResponse; + return { + textResponse: result.output.content, + metrics: { + prompt_tokens: result.output.usage.prompt_tokens, + completion_tokens: result.output.usage.completion_tokens, + total_tokens: result.output.usage.total_tokens, + outputTps: result.output.usage.completion_tokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { - const model = this.#ollamaClient({ temperature }); - const stream = await model - .pipe(new StringOutputParser()) - .stream(this.#convertToLangchainPrototypes(messages)); - return stream; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.client.chat({ + model: this.model, + stream: true, + messages, + keep_alive: this.keepAlive, + options: { + temperature, + useMLock: true, + // There are currently only two performance settings so if its not "base" - its max context. + ...(this.performanceMode === "base" + ? {} + : { numCtx: this.promptWindowLimit() }), + }, + }), + messages, + false + ).catch((e) => { + throw this.#errorHandler(e); + }); + return measuredStreamRequest; } + /** + * Handles streaming responses from Ollama. + * @param {import("express").Response} response + * @param {import("../../helpers/chat/LLMPerformanceMonitor").MonitoredStream} stream + * @param {import("express").Request} request + * @returns {Promise} + */ handleStream(response, stream, responseProps) { const { uuid = uuidv4(), sources = [] } = responseProps; return new Promise(async (resolve) => { let fullText = ""; + let usage = { + prompt_tokens: 0, + completion_tokens: 0, + }; // Establish listener to early-abort a streaming response // in case things go sideways or the user does not like the response. // We preserve the generated text but continue as if chat was completed // to preserve previously generated content. - const handleAbort = () => clientAbortedHandler(resolve, fullText); + const handleAbort = () => { + stream?.endMeasurement(usage); + clientAbortedHandler(resolve, fullText); + }; response.on("close", handleAbort); try { @@ -193,30 +228,36 @@ class OllamaAILLM { "Stream returned undefined chunk. Aborting reply - check model provider logs." ); - const content = chunk.hasOwnProperty("content") - ? chunk.content - : chunk; - fullText += content; - writeResponseChunk(response, { - uuid, - sources: [], - type: "textResponseChunk", - textResponse: content, - close: false, - error: false, - }); - } + if (chunk.done) { + usage.prompt_tokens = chunk.prompt_eval_count; + usage.completion_tokens = chunk.eval_count; + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); + resolve(fullText); + break; + } - writeResponseChunk(response, { - uuid, - sources, - type: "textResponseChunk", - textResponse: "", - close: true, - error: false, - }); - response.removeListener("close", handleAbort); - resolve(fullText); + if (chunk.hasOwnProperty("message")) { + const content = chunk.message.content; + fullText += content; + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: content, + close: false, + error: false, + }); + } + } } catch (error) { writeResponseChunk(response, { uuid, @@ -229,6 +270,8 @@ class OllamaAILLM { }`, }); response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); + resolve(fullText); } }); } diff --git a/server/utils/AiProviders/openAi/index.js b/server/utils/AiProviders/openAi/index.js index 2829a53747..4209b99ed2 100644 --- a/server/utils/AiProviders/openAi/index.js +++ b/server/utils/AiProviders/openAi/index.js @@ -3,6 +3,9 @@ const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); const { MODEL_MAP } = require("../modelMap"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); class OpenAiLLM { constructor(embedder = null, modelPreference = null) { @@ -132,19 +135,34 @@ class OpenAiLLM { `OpenAI chat: ${this.model} is not valid for chat completion!` ); - const result = await this.openai.chat.completions - .create({ - model: this.model, - messages, - temperature: this.isO1Model ? 1 : temperature, // o1 models only accept temperature 1 - }) - .catch((e) => { - throw new Error(e.message); - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature: this.isO1Model ? 1 : temperature, // o1 models only accept temperature 1 + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage.prompt_tokens || 0, + completion_tokens: result.output.usage.completion_tokens || 0, + total_tokens: result.output.usage.total_tokens || 0, + outputTps: result.output.usage.completion_tokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -153,13 +171,19 @@ class OpenAiLLM { `OpenAI chat: ${this.model} is not valid for chat completion!` ); - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, - messages, - temperature: this.isO1Model ? 1 : temperature, // o1 models only accept temperature 1 - }); - return streamRequest; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature: this.isO1Model ? 1 : temperature, // o1 models only accept temperature 1 + }), + messages + // runPromptTokenCalculation: true - We manually count the tokens because OpenAI does not provide them in the stream + // since we are not using the OpenAI API version that supports this `stream_options` param. + ); + + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/AiProviders/openRouter/index.js b/server/utils/AiProviders/openRouter/index.js index 3ec813423e..3abab76344 100644 --- a/server/utils/AiProviders/openRouter/index.js +++ b/server/utils/AiProviders/openRouter/index.js @@ -7,6 +7,9 @@ const { const fs = require("fs"); const path = require("path"); const { safeJsonParse } = require("../../http"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const cacheFolder = path.resolve( process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, "models", "openrouter") @@ -190,19 +193,34 @@ class OpenRouterLLM { `OpenRouter chat: ${this.model} is not valid for chat completion!` ); - const result = await this.openai.chat.completions - .create({ - model: this.model, - messages, - temperature, - }) - .catch((e) => { - throw new Error(e.message); - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature, + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage.prompt_tokens || 0, + completion_tokens: result.output.usage.completion_tokens || 0, + total_tokens: result.output.usage.total_tokens || 0, + outputTps: result.output.usage.completion_tokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -211,15 +229,32 @@ class OpenRouterLLM { `OpenRouter chat: ${this.model} is not valid for chat completion!` ); - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, - messages, - temperature, - }); - return streamRequest; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), + messages + // We have to manually count the tokens + // OpenRouter has a ton of providers and they all can return slightly differently + // some return chunk.usage on STOP, some do it after stop, its inconsistent. + // So it is possible reported metrics are inaccurate since we cannot reliably + // catch the metrics before resolving the stream - so we just pretend this functionality + // is not available. + ); + + return measuredStreamRequest; } + /** + * Handles the default stream response for a chat. + * @param {import("express").Response} response + * @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream + * @param {Object} responseProps + * @returns {Promise} + */ handleStream(response, stream, responseProps) { const timeoutThresholdMs = this.timeout; const { uuid = uuidv4(), sources = [] } = responseProps; @@ -232,7 +267,12 @@ class OpenRouterLLM { // in case things go sideways or the user does not like the response. // We preserve the generated text but continue as if chat was completed // to preserve previously generated content. - const handleAbort = () => clientAbortedHandler(resolve, fullText); + const handleAbort = () => { + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); + clientAbortedHandler(resolve, fullText); + }; response.on("close", handleAbort); // NOTICE: Not all OpenRouter models will return a stop reason @@ -261,6 +301,9 @@ class OpenRouterLLM { }); clearInterval(timeoutCheck); response.removeListener("close", handleAbort); + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); resolve(fullText); } }, 500); @@ -293,6 +336,10 @@ class OpenRouterLLM { error: false, }); response.removeListener("close", handleAbort); + clearInterval(timeoutCheck); + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); resolve(fullText); } } @@ -306,6 +353,10 @@ class OpenRouterLLM { error: e.message, }); response.removeListener("close", handleAbort); + clearInterval(timeoutCheck); + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); resolve(fullText); } }); diff --git a/server/utils/AiProviders/perplexity/index.js b/server/utils/AiProviders/perplexity/index.js index 93639f9f14..c365c17eb1 100644 --- a/server/utils/AiProviders/perplexity/index.js +++ b/server/utils/AiProviders/perplexity/index.js @@ -2,6 +2,9 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); function perplexityModels() { const { MODELS } = require("./models.js"); @@ -86,19 +89,34 @@ class PerplexityLLM { `Perplexity chat: ${this.model} is not valid for chat completion!` ); - const result = await this.openai.chat.completions - .create({ - model: this.model, - messages, - temperature, - }) - .catch((e) => { - throw new Error(e.message); - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature, + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage?.prompt_tokens || 0, + completion_tokens: result.output.usage?.completion_tokens || 0, + total_tokens: result.output.usage?.total_tokens || 0, + outputTps: result.output.usage?.completion_tokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -107,13 +125,16 @@ class PerplexityLLM { `Perplexity chat: ${this.model} is not valid for chat completion!` ); - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, - messages, - temperature, - }); - return streamRequest; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), + messages + ); + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/AiProviders/textGenWebUI/index.js b/server/utils/AiProviders/textGenWebUI/index.js index 68d7a6ac85..f1c3590bff 100644 --- a/server/utils/AiProviders/textGenWebUI/index.js +++ b/server/utils/AiProviders/textGenWebUI/index.js @@ -2,6 +2,9 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); class TextGenWebUILLM { constructor(embedder = null) { @@ -119,29 +122,47 @@ class TextGenWebUILLM { } async getChatCompletion(messages = null, { temperature = 0.7 }) { - const result = await this.openai.chat.completions - .create({ - model: this.model, - messages, - temperature, - }) - .catch((e) => { - throw new Error(e.message); - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature, + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage?.prompt_tokens || 0, + completion_tokens: result.output.usage?.completion_tokens || 0, + total_tokens: result.output.usage?.total_tokens || 0, + outputTps: result.output.usage?.completion_tokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, - messages, - temperature, - }); - return streamRequest; + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), + messages + ); + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/AiProviders/togetherAi/index.js b/server/utils/AiProviders/togetherAi/index.js index b255a5a9d3..7213bdd08b 100644 --- a/server/utils/AiProviders/togetherAi/index.js +++ b/server/utils/AiProviders/togetherAi/index.js @@ -2,6 +2,9 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); function togetherAiModels() { const { MODELS } = require("./models.js"); @@ -109,15 +112,34 @@ class TogetherAiLLM { `TogetherAI chat: ${this.model} is not valid for chat completion!` ); - const result = await this.openai.chat.completions.create({ - model: this.model, - messages, - temperature, - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature, + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage?.prompt_tokens || 0, + completion_tokens: result.output.usage?.completion_tokens || 0, + total_tokens: result.output.usage?.total_tokens || 0, + outputTps: result.output.usage?.completion_tokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -126,13 +148,17 @@ class TogetherAiLLM { `TogetherAI chat: ${this.model} is not valid for chat completion!` ); - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), messages, - temperature, - }); - return streamRequest; + false + ); + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/AiProviders/xai/index.js b/server/utils/AiProviders/xai/index.js index 7a25760df5..b18aae98ce 100644 --- a/server/utils/AiProviders/xai/index.js +++ b/server/utils/AiProviders/xai/index.js @@ -1,4 +1,7 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { + LLMPerformanceMonitor, +} = require("../../helpers/chat/LLMPerformanceMonitor"); const { handleDefaultStreamResponseV2, } = require("../../helpers/chat/responses"); @@ -114,19 +117,34 @@ class XAiLLM { `xAI chat: ${this.model} is not valid for chat completion!` ); - const result = await this.openai.chat.completions - .create({ - model: this.model, - messages, - temperature, - }) - .catch((e) => { - throw new Error(e.message); - }); + const result = await LLMPerformanceMonitor.measureAsyncFunction( + this.openai.chat.completions + .create({ + model: this.model, + messages, + temperature, + }) + .catch((e) => { + throw new Error(e.message); + }) + ); - if (!result.hasOwnProperty("choices") || result.choices.length === 0) + if ( + !result.output.hasOwnProperty("choices") || + result.output.choices.length === 0 + ) return null; - return result.choices[0].message.content; + + return { + textResponse: result.output.choices[0].message.content, + metrics: { + prompt_tokens: result.output.usage.prompt_tokens || 0, + completion_tokens: result.output.usage.completion_tokens || 0, + total_tokens: result.output.usage.total_tokens || 0, + outputTps: result.output.usage.completion_tokens / result.duration, + duration: result.duration, + }, + }; } async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { @@ -135,13 +153,18 @@ class XAiLLM { `xAI chat: ${this.model} is not valid for chat completion!` ); - const streamRequest = await this.openai.chat.completions.create({ - model: this.model, - stream: true, + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + this.openai.chat.completions.create({ + model: this.model, + stream: true, + messages, + temperature, + }), messages, - temperature, - }); - return streamRequest; + false + ); + + return measuredStreamRequest; } handleStream(response, stream, responseProps) { diff --git a/server/utils/chats/apiChatHandler.js b/server/utils/chats/apiChatHandler.js index f37508534f..7ba45fed62 100644 --- a/server/utils/chats/apiChatHandler.js +++ b/server/utils/chats/apiChatHandler.js @@ -18,6 +18,7 @@ const { Telemetry } = require("../../models/telemetry"); * @property {object[]} sources * @property {boolean} close * @property {string|null} error + * @property {object} metrics */ /** @@ -120,6 +121,7 @@ async function chatSync({ text: textResponse, sources: [], type: chatMode, + metrics: {}, }, include: false, apiSessionId: sessionId, @@ -132,6 +134,7 @@ async function chatSync({ close: true, error: null, textResponse, + metrics: {}, }; } @@ -193,6 +196,7 @@ async function chatSync({ sources: [], close: true, error: vectorSearchResults.message, + metrics: {}, }; } @@ -228,6 +232,7 @@ async function chatSync({ text: textResponse, sources: [], type: chatMode, + metrics: {}, }, threadId: thread?.id || null, include: false, @@ -242,6 +247,7 @@ async function chatSync({ close: true, error: null, textResponse, + metrics: {}, }; } @@ -259,9 +265,10 @@ async function chatSync({ ); // Send the text completion. - const textResponse = await LLMConnector.getChatCompletion(messages, { - temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp, - }); + const { textResponse, metrics: performanceMetrics } = + await LLMConnector.getChatCompletion(messages, { + temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp, + }); if (!textResponse) { return { @@ -271,13 +278,19 @@ async function chatSync({ sources: [], close: true, error: "No text completion could be completed with this input.", + metrics: performanceMetrics, }; } const { chat } = await WorkspaceChats.new({ workspaceId: workspace.id, prompt: message, - response: { text: textResponse, sources, type: chatMode }, + response: { + text: textResponse, + sources, + type: chatMode, + metrics: performanceMetrics, + }, threadId: thread?.id || null, apiSessionId: sessionId, user, @@ -291,6 +304,7 @@ async function chatSync({ chatId: chat.id, textResponse, sources, + metrics: performanceMetrics, }; } @@ -396,6 +410,7 @@ async function streamChat({ attachments: [], close: true, error: null, + metrics: {}, }); await WorkspaceChats.new({ workspaceId: workspace.id, @@ -405,6 +420,7 @@ async function streamChat({ sources: [], type: chatMode, attachments: [], + metrics: {}, }, threadId: thread?.id || null, apiSessionId: sessionId, @@ -418,6 +434,7 @@ async function streamChat({ // 1. Chatting in "chat" mode and may or may _not_ have embeddings // 2. Chatting in "query" mode and has at least 1 embedding let completeText; + let metrics = {}; let contextTexts = []; let sources = []; let pinnedDocIdentifiers = []; @@ -479,6 +496,7 @@ async function streamChat({ sources: [], close: true, error: vectorSearchResults.message, + metrics: {}, }); return; } @@ -514,6 +532,7 @@ async function streamChat({ sources: [], close: true, error: null, + metrics: {}, }); await WorkspaceChats.new({ @@ -524,6 +543,7 @@ async function streamChat({ sources: [], type: chatMode, attachments: [], + metrics: {}, }, threadId: thread?.id || null, apiSessionId: sessionId, @@ -552,9 +572,12 @@ async function streamChat({ console.log( `\x1b[31m[STREAMING DISABLED]\x1b[0m Streaming is not available for ${LLMConnector.constructor.name}. Will use regular chat method.` ); - completeText = await LLMConnector.getChatCompletion(messages, { - temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp, - }); + const { textResponse, metrics: performanceMetrics } = + await LLMConnector.getChatCompletion(messages, { + temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp, + }); + completeText = textResponse; + metrics = performanceMetrics; writeResponseChunk(response, { uuid, sources, @@ -562,6 +585,7 @@ async function streamChat({ textResponse: completeText, close: true, error: false, + metrics, }); } else { const stream = await LLMConnector.streamGetChatCompletion(messages, { @@ -571,13 +595,14 @@ async function streamChat({ uuid, sources, }); + metrics = stream.metrics; } if (completeText?.length > 0) { const { chat } = await WorkspaceChats.new({ workspaceId: workspace.id, prompt: message, - response: { text: completeText, sources, type: chatMode }, + response: { text: completeText, sources, type: chatMode, metrics }, threadId: thread?.id || null, apiSessionId: sessionId, user, @@ -589,6 +614,7 @@ async function streamChat({ close: true, error: false, chatId: chat.id, + metrics, }); return; } diff --git a/server/utils/chats/embed.js b/server/utils/chats/embed.js index b4d1a03fbc..7196d161e2 100644 --- a/server/utils/chats/embed.js +++ b/server/utils/chats/embed.js @@ -54,6 +54,7 @@ async function streamChatWithForEmbed( } let completeText; + let metrics = {}; let contextTexts = []; let sources = []; let pinnedDocIdentifiers = []; @@ -164,9 +165,12 @@ async function streamChatWithForEmbed( console.log( `\x1b[31m[STREAMING DISABLED]\x1b[0m Streaming is not available for ${LLMConnector.constructor.name}. Will use regular chat method.` ); - completeText = await LLMConnector.getChatCompletion(messages, { - temperature: embed.workspace?.openAiTemp ?? LLMConnector.defaultTemp, - }); + const { textResponse, metrics: performanceMetrics } = + await LLMConnector.getChatCompletion(messages, { + temperature: embed.workspace?.openAiTemp ?? LLMConnector.defaultTemp, + }); + completeText = textResponse; + metrics = performanceMetrics; writeResponseChunk(response, { uuid, sources: [], @@ -183,12 +187,13 @@ async function streamChatWithForEmbed( uuid, sources: [], }); + metrics = stream.metrics; } await EmbedChats.new({ embedId: embed.id, prompt: message, - response: { text: completeText, type: chatMode, sources }, + response: { text: completeText, type: chatMode, sources, metrics }, connection_information: response.locals.connection ? { ...response.locals.connection, diff --git a/server/utils/chats/openaiCompatible.js b/server/utils/chats/openaiCompatible.js index bd984b76ec..b66d0f47b6 100644 --- a/server/utils/chats/openaiCompatible.js +++ b/server/utils/chats/openaiCompatible.js @@ -156,10 +156,13 @@ async function chatSync({ }); // Send the text completion. - const textResponse = await LLMConnector.getChatCompletion(messages, { - temperature: - temperature ?? workspace?.openAiTemp ?? LLMConnector.defaultTemp, - }); + const { textResponse, metrics } = await LLMConnector.getChatCompletion( + messages, + { + temperature: + temperature ?? workspace?.openAiTemp ?? LLMConnector.defaultTemp, + } + ); if (!textResponse) { return formatJSON( @@ -171,14 +174,14 @@ async function chatSync({ error: "No text completion could be completed with this input.", textResponse: null, }, - { model: workspace.slug, finish_reason: "no_content" } + { model: workspace.slug, finish_reason: "no_content", usage: metrics } ); } const { chat } = await WorkspaceChats.new({ workspaceId: workspace.id, prompt: prompt, - response: { text: textResponse, sources, type: chatMode }, + response: { text: textResponse, sources, type: chatMode, metrics }, }); return formatJSON( @@ -191,7 +194,7 @@ async function chatSync({ textResponse, sources, }, - { model: workspace.slug, finish_reason: "stop" } + { model: workspace.slug, finish_reason: "stop", usage: metrics } ); } @@ -414,7 +417,12 @@ async function streamChat({ const { chat } = await WorkspaceChats.new({ workspaceId: workspace.id, prompt: prompt, - response: { text: completeText, sources, type: chatMode }, + response: { + text: completeText, + sources, + type: chatMode, + metrics: stream.metrics, + }, }); writeResponseChunk( @@ -428,7 +436,12 @@ async function streamChat({ chatId: chat.id, textResponse: "", }, - { chunked: true, model: workspace.slug, finish_reason: "stop" } + { + chunked: true, + model: workspace.slug, + finish_reason: "stop", + usage: stream.metrics, + } ) ); return; @@ -444,13 +457,21 @@ async function streamChat({ error: false, textResponse: "", }, - { chunked: true, model: workspace.slug, finish_reason: "stop" } + { + chunked: true, + model: workspace.slug, + finish_reason: "stop", + usage: stream.metrics, + } ) ); return; } -function formatJSON(chat, { chunked = false, model, finish_reason = null }) { +function formatJSON( + chat, + { chunked = false, model, finish_reason = null, usage = {} } +) { const data = { id: chat.uuid ?? chat.id, object: "chat.completion", @@ -466,6 +487,7 @@ function formatJSON(chat, { chunked = false, model, finish_reason = null }) { finish_reason: finish_reason, }, ], + usage, }; return data; diff --git a/server/utils/chats/stream.js b/server/utils/chats/stream.js index bc13833631..35b0c191e6 100644 --- a/server/utils/chats/stream.js +++ b/server/utils/chats/stream.js @@ -94,6 +94,7 @@ async function streamChatWithWorkspace( // 1. Chatting in "chat" mode and may or may _not_ have embeddings // 2. Chatting in "query" mode and has at least 1 embedding let completeText; + let metrics = {}; let contextTexts = []; let sources = []; let pinnedDocIdentifiers = []; @@ -226,9 +227,13 @@ async function streamChatWithWorkspace( console.log( `\x1b[31m[STREAMING DISABLED]\x1b[0m Streaming is not available for ${LLMConnector.constructor.name}. Will use regular chat method.` ); - completeText = await LLMConnector.getChatCompletion(messages, { - temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp, - }); + const { textResponse, metrics: performanceMetrics } = + await LLMConnector.getChatCompletion(messages, { + temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp, + }); + + completeText = textResponse; + metrics = performanceMetrics; writeResponseChunk(response, { uuid, sources, @@ -236,6 +241,7 @@ async function streamChatWithWorkspace( textResponse: completeText, close: true, error: false, + metrics, }); } else { const stream = await LLMConnector.streamGetChatCompletion(messages, { @@ -245,13 +251,20 @@ async function streamChatWithWorkspace( uuid, sources, }); + metrics = stream.metrics; } if (completeText?.length > 0) { const { chat } = await WorkspaceChats.new({ workspaceId: workspace.id, prompt: message, - response: { text: completeText, sources, type: chatMode, attachments }, + response: { + text: completeText, + sources, + type: chatMode, + attachments, + metrics, + }, threadId: thread?.id || null, user, }); @@ -262,6 +275,7 @@ async function streamChatWithWorkspace( close: true, error: false, chatId: chat.id, + metrics, }); return; } @@ -271,6 +285,7 @@ async function streamChatWithWorkspace( type: "finalizeResponseStream", close: true, error: false, + metrics, }); return; } diff --git a/server/utils/helpers/chat/LLMPerformanceMonitor.js b/server/utils/helpers/chat/LLMPerformanceMonitor.js new file mode 100644 index 0000000000..bd02863edf --- /dev/null +++ b/server/utils/helpers/chat/LLMPerformanceMonitor.js @@ -0,0 +1,101 @@ +const { TokenManager } = require("../tiktoken"); + +/** + * @typedef {import("openai/streaming").Stream} OpenAICompatibleStream + * @typedef {(reportedUsage: {[key: string]: number, completion_tokens?: number, prompt_tokens?: number}) => StreamMetrics} EndMeasurementFunction + * @typedef {Array<{content: string}>} Messages + */ + +/** + * @typedef {Object} StreamMetrics + * @property {number} prompt_tokens - the number of tokens in the prompt + * @property {number} completion_tokens - the number of tokens in the completion + * @property {number} total_tokens - the total number of tokens + * @property {number} outputTps - the tokens per second of the output + * @property {number} duration - the duration of the stream + */ + +/** + * @typedef {Object} MonitoredStream + * @property {number} start - the start time of the stream + * @property {number} duration - the duration of the stream + * @property {StreamMetrics} metrics - the metrics of the stream + * @property {EndMeasurementFunction} endMeasurement - the method to end the stream and calculate the metrics + */ + +class LLMPerformanceMonitor { + static tokenManager = new TokenManager(); + /** + * Counts the tokens in the messages. + * @param {Array<{content: string}>} messages - the messages sent to the LLM so we can calculate the prompt tokens since most providers do not return this on stream + * @returns {number} + */ + static countTokens(messages = []) { + try { + return this.tokenManager.statsFrom(messages); + } catch (e) { + return 0; + } + } + /** + * Wraps a function and logs the duration (in seconds) of the function call. + * @param {Function} func + * @returns {Promise<{output: any, duration: number}>} + */ + static measureAsyncFunction(func) { + return (async () => { + const start = Date.now(); + const output = await func; // is a promise + const end = Date.now(); + return { output, duration: (end - start) / 1000 }; + })(); + } + + /** + * Wraps a completion stream and and attaches a start time and duration property to the stream. + * Also attaches an `endMeasurement` method to the stream that will calculate the duration of the stream and metrics. + * @param {Promise} func + * @param {Messages} messages - the messages sent to the LLM so we can calculate the prompt tokens since most providers do not return this on stream + * @param {boolean} runPromptTokenCalculation - whether to run the prompt token calculation to estimate the `prompt_tokens` metric. This is useful for providers that do not return this on stream. + * @returns {Promise} + */ + static async measureStream( + func, + messages = [], + runPromptTokenCalculation = true + ) { + const stream = await func; + stream.start = Date.now(); + stream.duration = 0; + stream.metrics = { + completion_tokens: 0, + prompt_tokens: runPromptTokenCalculation ? this.countTokens(messages) : 0, + total_tokens: 0, + outputTps: 0, + duration: 0, + }; + + stream.endMeasurement = (reportedUsage = {}) => { + const end = Date.now(); + const duration = (end - stream.start) / 1000; + + // Merge the reported usage with the existing metrics + // so the math in the metrics object is correct when calculating + stream.metrics = { + ...stream.metrics, + ...reportedUsage, + }; + + stream.metrics.total_tokens = + stream.metrics.prompt_tokens + (stream.metrics.completion_tokens || 0); + stream.metrics.outputTps = stream.metrics.completion_tokens / duration; + stream.metrics.duration = duration; + return stream.metrics; + }; + return stream; + } +} + +module.exports = { + LLMPerformanceMonitor, +}; diff --git a/server/utils/helpers/chat/responses.js b/server/utils/helpers/chat/responses.js index 3f369f5a54..9be1d224cc 100644 --- a/server/utils/helpers/chat/responses.js +++ b/server/utils/helpers/chat/responses.js @@ -9,9 +9,29 @@ function clientAbortedHandler(resolve, fullText) { return; } +/** + * Handles the default stream response for a chat. + * @param {import("express").Response} response + * @param {import('./LLMPerformanceMonitor').MonitoredStream} stream + * @param {Object} responseProps + * @returns {Promise} + */ function handleDefaultStreamResponseV2(response, stream, responseProps) { const { uuid = uuidv4(), sources = [] } = responseProps; + // Why are we doing this? + // OpenAI do enable the usage metrics in the stream response but: + // 1. This parameter is not available in our current API version (TODO: update) + // 2. The usage metrics are not available in _every_ provider that uses this function + // 3. We need to track the usage metrics for every provider that uses this function - not just OpenAI + // Other keys are added by the LLMPerformanceMonitor.measureStream method + let hasUsageMetrics = false; + let usage = { + // prompt_tokens can be in this object if the provider supports it - otherwise we manually count it + // When the stream is created in the LLMProviders `streamGetChatCompletion` `LLMPerformanceMonitor.measureStream` call. + completion_tokens: 0, + }; + return new Promise(async (resolve) => { let fullText = ""; @@ -19,7 +39,10 @@ function handleDefaultStreamResponseV2(response, stream, responseProps) { // in case things go sideways or the user does not like the response. // We preserve the generated text but continue as if chat was completed // to preserve previously generated content. - const handleAbort = () => clientAbortedHandler(resolve, fullText); + const handleAbort = () => { + stream?.endMeasurement(usage); + clientAbortedHandler(resolve, fullText); + }; response.on("close", handleAbort); // Now handle the chunks from the streamed response and append to fullText. @@ -28,8 +51,28 @@ function handleDefaultStreamResponseV2(response, stream, responseProps) { const message = chunk?.choices?.[0]; const token = message?.delta?.content; + // If we see usage metrics in the chunk, we can use them directly + // instead of estimating them, but we only want to assign values if + // the response object is the exact same key:value pair we expect. + if ( + chunk.hasOwnProperty("usage") && // exists + !!chunk.usage && // is not null + Object.values(chunk.usage).length > 0 // has values + ) { + if (chunk.usage.hasOwnProperty("prompt_tokens")) { + usage.prompt_tokens = Number(chunk.usage.prompt_tokens); + } + + if (chunk.usage.hasOwnProperty("completion_tokens")) { + hasUsageMetrics = true; // to stop estimating counter + usage.completion_tokens = Number(chunk.usage.completion_tokens); + } + } + if (token) { fullText += token; + // If we never saw a usage metric, we can estimate them by number of completion chunks + if (!hasUsageMetrics) usage.completion_tokens++; writeResponseChunk(response, { uuid, sources: [], @@ -56,6 +99,7 @@ function handleDefaultStreamResponseV2(response, stream, responseProps) { error: false, }); response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); resolve(fullText); break; // Break streaming when a valid finish_reason is first encountered } @@ -70,6 +114,7 @@ function handleDefaultStreamResponseV2(response, stream, responseProps) { close: true, error: e.message, }); + stream?.endMeasurement(usage); resolve(fullText); // Return what we currently have - if anything. } }); @@ -111,6 +156,7 @@ function convertToChatHistory(history = []) { chatId: id, sentAt: moment(createdAt).unix(), feedbackScore, + metrics: data?.metrics || {}, }, ]); } diff --git a/server/utils/helpers/index.js b/server/utils/helpers/index.js index 748e4fb1b1..55d190f4fd 100644 --- a/server/utils/helpers/index.js +++ b/server/utils/helpers/index.js @@ -6,14 +6,38 @@ * @property {string} contentString - full base64 encoded string of file */ +/** + * @typedef {Object} ResponseMetrics + * @property {number} prompt_tokens - The number of prompt tokens used + * @property {number} completion_tokens - The number of completion tokens used + * @property {number} total_tokens - The total number of tokens used + * @property {number} outputTps - The output tokens per second + * @property {number} duration - The duration of the request in seconds + * + * @typedef {Object} ChatMessage + * @property {string} role - The role of the message sender (e.g. 'user', 'assistant', 'system') + * @property {string} content - The content of the message + * + * @typedef {Object} ChatCompletionResponse + * @property {string} textResponse - The text response from the LLM + * @property {ResponseMetrics} metrics - The response metrics + * + * @typedef {Object} ChatCompletionOptions + * @property {number} temperature - The sampling temperature for the LLM response + * + * @typedef {function(Array, ChatCompletionOptions): Promise} getChatCompletionFunction + * + * @typedef {function(Array, ChatCompletionOptions): Promise} streamGetChatCompletionFunction + */ + /** * @typedef {Object} BaseLLMProvider - A basic llm provider object * @property {Function} streamingEnabled - Checks if streaming is enabled for chat completions. * @property {Function} promptWindowLimit - Returns the token limit for the current model. * @property {Function} isValidChatCompletionModel - Validates if the provided model is suitable for chat completion. * @property {Function} constructPrompt - Constructs a formatted prompt for the chat completion request. - * @property {Function} getChatCompletion - Gets a chat completion response from OpenAI. - * @property {Function} streamGetChatCompletion - Streams a chat completion response from OpenAI. + * @property {getChatCompletionFunction} getChatCompletion - Gets a chat completion response from OpenAI. + * @property {streamGetChatCompletionFunction} streamGetChatCompletion - Streams a chat completion response from OpenAI. * @property {Function} handleStream - Handles the streaming response. * @property {Function} embedTextInput - Embeds the provided text input using the specified embedder. * @property {Function} embedChunks - Embeds multiple chunks of text using the specified embedder.