From aca59406508d4ef7d1f917fc319419ae8081f2b5 Mon Sep 17 00:00:00 2001 From: Timothy Carambat Date: Wed, 7 Feb 2024 08:15:14 -0800 Subject: [PATCH] Refactor handleStream to LLM Classes (#685) --- server/utils/AiProviders/azureOpenAi/index.js | 38 ++- server/utils/AiProviders/gemini/index.js | 34 +- server/utils/AiProviders/huggingface/index.js | 113 ++++++- server/utils/AiProviders/lmStudio/index.js | 5 + server/utils/AiProviders/localAi/index.js | 5 + server/utils/AiProviders/mistral/index.js | 5 + server/utils/AiProviders/native/index.js | 36 +++ server/utils/AiProviders/ollama/index.js | 36 +++ server/utils/AiProviders/openAi/index.js | 5 + server/utils/AiProviders/togetherAi/index.js | 97 +++++- server/utils/chats/embed.js | 6 +- server/utils/chats/stream.js | 301 +----------------- 12 files changed, 374 insertions(+), 307 deletions(-) diff --git a/server/utils/AiProviders/azureOpenAi/index.js b/server/utils/AiProviders/azureOpenAi/index.js index 639ac102ed..eac47f0efb 100644 --- a/server/utils/AiProviders/azureOpenAi/index.js +++ b/server/utils/AiProviders/azureOpenAi/index.js @@ -1,5 +1,6 @@ const { AzureOpenAiEmbedder } = require("../../EmbeddingEngines/azureOpenAi"); const { chatPrompt } = require("../../chats"); +const { writeResponseChunk } = require("../../chats/stream"); class AzureOpenAiLLM { constructor(embedder = null, _modelPreference = null) { @@ -135,7 +136,7 @@ class AzureOpenAiLLM { n: 1, } ); - return { type: "azureStream", stream }; + return stream; } async getChatCompletion(messages = [], { temperature = 0.7 }) { @@ -165,7 +166,40 @@ class AzureOpenAiLLM { n: 1, } ); - return { type: "azureStream", stream }; + return stream; + } + + handleStream(response, stream, responseProps) { + const { uuid = uuidv4(), sources = [] } = responseProps; + + return new Promise(async (resolve) => { + let fullText = ""; + for await (const event of stream) { + for (const choice of event.choices) { + const delta = choice.delta?.content; + if (!delta) continue; + fullText += delta; + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: delta, + close: false, + error: false, + }); + } + } + + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + resolve(fullText); + }); } // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations diff --git a/server/utils/AiProviders/gemini/index.js b/server/utils/AiProviders/gemini/index.js index 63549fb8dd..36c63df722 100644 --- a/server/utils/AiProviders/gemini/index.js +++ b/server/utils/AiProviders/gemini/index.js @@ -1,4 +1,5 @@ const { chatPrompt } = require("../../chats"); +const { writeResponseChunk } = require("../../chats/stream"); class GeminiLLM { constructor(embedder = null, modelPreference = null) { @@ -164,7 +165,7 @@ class GeminiLLM { if (!responseStream.stream) throw new Error("Could not stream response stream from Gemini."); - return { type: "geminiStream", ...responseStream }; + return responseStream.stream; } async streamGetChatCompletion(messages = [], _opts = {}) { @@ -183,7 +184,7 @@ class GeminiLLM { if (!responseStream.stream) throw new Error("Could not stream response stream from Gemini."); - return { type: "geminiStream", ...responseStream }; + return responseStream.stream; } async compressMessages(promptArgs = {}, rawHistory = []) { @@ -192,6 +193,35 @@ class GeminiLLM { return await messageArrayCompressor(this, messageArray, rawHistory); } + handleStream(response, stream, responseProps) { + const { uuid = uuidv4(), sources = [] } = responseProps; + + return new Promise(async (resolve) => { + let fullText = ""; + for await (const chunk of stream) { + fullText += chunk.text(); + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: chunk.text(), + close: false, + error: false, + }); + } + + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + resolve(fullText); + }); + } + // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations async embedTextInput(textInput) { return await this.embedder.embedTextInput(textInput); diff --git a/server/utils/AiProviders/huggingface/index.js b/server/utils/AiProviders/huggingface/index.js index 4faf9b30f0..8fcc2b47ef 100644 --- a/server/utils/AiProviders/huggingface/index.js +++ b/server/utils/AiProviders/huggingface/index.js @@ -1,6 +1,7 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { OpenAiEmbedder } = require("../../EmbeddingEngines/openAi"); const { chatPrompt } = require("../../chats"); +const { writeResponseChunk } = require("../../chats/stream"); class HuggingFaceLLM { constructor(embedder = null, _modelPreference = null) { @@ -138,7 +139,7 @@ class HuggingFaceLLM { }, { responseType: "stream" } ); - return { type: "huggingFaceStream", stream: streamRequest }; + return streamRequest; } async getChatCompletion(messages = null, { temperature = 0.7 }) { @@ -162,7 +163,115 @@ class HuggingFaceLLM { }, { responseType: "stream" } ); - return { type: "huggingFaceStream", stream: streamRequest }; + return streamRequest; + } + + handleStream(response, stream, responseProps) { + const { uuid = uuidv4(), sources = [] } = responseProps; + + return new Promise((resolve) => { + let fullText = ""; + let chunk = ""; + stream.data.on("data", (data) => { + const lines = data + ?.toString() + ?.split("\n") + .filter((line) => line.trim() !== ""); + + for (const line of lines) { + let validJSON = false; + const message = chunk + line.replace(/^data:/, ""); + if (message !== "[DONE]") { + // JSON chunk is incomplete and has not ended yet + // so we need to stitch it together. You would think JSON + // chunks would only come complete - but they don't! + try { + JSON.parse(message); + validJSON = true; + } catch { + console.log("Failed to parse message", message); + } + + if (!validJSON) { + // It can be possible that the chunk decoding is running away + // and the message chunk fails to append due to string length. + // In this case abort the chunk and reset so we can continue. + // ref: https://github.com/Mintplex-Labs/anything-llm/issues/416 + try { + chunk += message; + } catch (e) { + console.error(`Chunk appending error`, e); + chunk = ""; + } + continue; + } else { + chunk = ""; + } + } + + if (message == "[DONE]") { + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + resolve(fullText); + } else { + let error = null; + let finishReason = null; + let token = ""; + try { + const json = JSON.parse(message); + error = json?.error || null; + token = json?.choices?.[0]?.delta?.content; + finishReason = json?.choices?.[0]?.finish_reason || null; + } catch { + continue; + } + + if (!!error) { + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: null, + close: true, + error, + }); + resolve(""); + return; + } + + if (token) { + fullText += token; + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: token, + close: false, + error: false, + }); + } + + if (finishReason !== null) { + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + resolve(fullText); + } + } + } + }); + }); } // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations diff --git a/server/utils/AiProviders/lmStudio/index.js b/server/utils/AiProviders/lmStudio/index.js index 08950a7b96..fe8064b689 100644 --- a/server/utils/AiProviders/lmStudio/index.js +++ b/server/utils/AiProviders/lmStudio/index.js @@ -1,4 +1,5 @@ const { chatPrompt } = require("../../chats"); +const { handleDefaultStreamResponse } = require("../../chats/stream"); // hybrid of openAi LLM chat completion for LMStudio class LMStudioLLM { @@ -174,6 +175,10 @@ class LMStudioLLM { return streamRequest; } + handleStream(response, stream, responseProps) { + return handleDefaultStreamResponse(response, stream, responseProps); + } + // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations async embedTextInput(textInput) { return await this.embedder.embedTextInput(textInput); diff --git a/server/utils/AiProviders/localAi/index.js b/server/utils/AiProviders/localAi/index.js index 6d265cf828..2717c47f3a 100644 --- a/server/utils/AiProviders/localAi/index.js +++ b/server/utils/AiProviders/localAi/index.js @@ -1,4 +1,5 @@ const { chatPrompt } = require("../../chats"); +const { handleDefaultStreamResponse } = require("../../chats/stream"); class LocalAiLLM { constructor(embedder = null, modelPreference = null) { @@ -174,6 +175,10 @@ class LocalAiLLM { return streamRequest; } + handleStream(response, stream, responseProps) { + return handleDefaultStreamResponse(response, stream, responseProps); + } + // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations async embedTextInput(textInput) { return await this.embedder.embedTextInput(textInput); diff --git a/server/utils/AiProviders/mistral/index.js b/server/utils/AiProviders/mistral/index.js index a25185c763..785a8dd085 100644 --- a/server/utils/AiProviders/mistral/index.js +++ b/server/utils/AiProviders/mistral/index.js @@ -1,4 +1,5 @@ const { chatPrompt } = require("../../chats"); +const { handleDefaultStreamResponse } = require("../../chats/stream"); class MistralLLM { constructor(embedder = null, modelPreference = null) { @@ -164,6 +165,10 @@ class MistralLLM { return streamRequest; } + handleStream(response, stream, responseProps) { + return handleDefaultStreamResponse(response, stream, responseProps); + } + // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations async embedTextInput(textInput) { return await this.embedder.embedTextInput(textInput); diff --git a/server/utils/AiProviders/native/index.js b/server/utils/AiProviders/native/index.js index de1a97f3d7..4b96d02ef1 100644 --- a/server/utils/AiProviders/native/index.js +++ b/server/utils/AiProviders/native/index.js @@ -2,6 +2,7 @@ const fs = require("fs"); const path = require("path"); const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { chatPrompt } = require("../../chats"); +const { writeResponseChunk } = require("../../chats/stream"); // Docs: https://api.js.langchain.com/classes/chat_models_llama_cpp.ChatLlamaCpp.html const ChatLlamaCpp = (...args) => @@ -170,6 +171,41 @@ class NativeLLM { return responseStream; } + handleStream(response, stream, responseProps) { + const { uuid = uuidv4(), sources = [] } = responseProps; + + return new Promise(async (resolve) => { + let fullText = ""; + for await (const chunk of stream) { + if (chunk === undefined) + throw new Error( + "Stream returned undefined chunk. Aborting reply - check model provider logs." + ); + + const content = chunk.hasOwnProperty("content") ? chunk.content : chunk; + fullText += content; + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: content, + close: false, + error: false, + }); + } + + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + resolve(fullText); + }); + } + // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations async embedTextInput(textInput) { return await this.embedder.embedTextInput(textInput); diff --git a/server/utils/AiProviders/ollama/index.js b/server/utils/AiProviders/ollama/index.js index af7fe8210f..9a16d245af 100644 --- a/server/utils/AiProviders/ollama/index.js +++ b/server/utils/AiProviders/ollama/index.js @@ -1,5 +1,6 @@ const { chatPrompt } = require("../../chats"); const { StringOutputParser } = require("langchain/schema/output_parser"); +const { writeResponseChunk } = require("../../chats/stream"); // Docs: https://github.com/jmorganca/ollama/blob/main/docs/api.md class OllamaAILLM { @@ -165,6 +166,41 @@ class OllamaAILLM { return stream; } + handleStream(response, stream, responseProps) { + const { uuid = uuidv4(), sources = [] } = responseProps; + + return new Promise(async (resolve) => { + let fullText = ""; + for await (const chunk of stream) { + if (chunk === undefined) + throw new Error( + "Stream returned undefined chunk. Aborting reply - check model provider logs." + ); + + const content = chunk.hasOwnProperty("content") ? chunk.content : chunk; + fullText += content; + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: content, + close: false, + error: false, + }); + } + + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + resolve(fullText); + }); + } + // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations async embedTextInput(textInput) { return await this.embedder.embedTextInput(textInput); diff --git a/server/utils/AiProviders/openAi/index.js b/server/utils/AiProviders/openAi/index.js index 120e728448..dffd368241 100644 --- a/server/utils/AiProviders/openAi/index.js +++ b/server/utils/AiProviders/openAi/index.js @@ -1,5 +1,6 @@ const { OpenAiEmbedder } = require("../../EmbeddingEngines/openAi"); const { chatPrompt } = require("../../chats"); +const { handleDefaultStreamResponse } = require("../../chats/stream"); class OpenAiLLM { constructor(embedder = null, modelPreference = null) { @@ -222,6 +223,10 @@ class OpenAiLLM { return streamRequest; } + handleStream(response, stream, responseProps) { + return handleDefaultStreamResponse(response, stream, responseProps); + } + // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations async embedTextInput(textInput) { return await this.embedder.embedTextInput(textInput); diff --git a/server/utils/AiProviders/togetherAi/index.js b/server/utils/AiProviders/togetherAi/index.js index 341661f8db..7291622a73 100644 --- a/server/utils/AiProviders/togetherAi/index.js +++ b/server/utils/AiProviders/togetherAi/index.js @@ -1,4 +1,5 @@ const { chatPrompt } = require("../../chats"); +const { writeResponseChunk } = require("../../chats/stream"); function togetherAiModels() { const { MODELS } = require("./models.js"); @@ -141,7 +142,7 @@ class TogetherAiLLM { }, { responseType: "stream" } ); - return { type: "togetherAiStream", stream: streamRequest }; + return streamRequest; } async getChatCompletion(messages = null, { temperature = 0.7 }) { @@ -175,7 +176,99 @@ class TogetherAiLLM { }, { responseType: "stream" } ); - return { type: "togetherAiStream", stream: streamRequest }; + return streamRequest; + } + + handleStream(response, stream, responseProps) { + const { uuid = uuidv4(), sources = [] } = responseProps; + + return new Promise((resolve) => { + let fullText = ""; + let chunk = ""; + stream.data.on("data", (data) => { + const lines = data + ?.toString() + ?.split("\n") + .filter((line) => line.trim() !== ""); + + for (const line of lines) { + let validJSON = false; + const message = chunk + line.replace(/^data: /, ""); + + if (message !== "[DONE]") { + // JSON chunk is incomplete and has not ended yet + // so we need to stitch it together. You would think JSON + // chunks would only come complete - but they don't! + try { + JSON.parse(message); + validJSON = true; + } catch {} + + if (!validJSON) { + // It can be possible that the chunk decoding is running away + // and the message chunk fails to append due to string length. + // In this case abort the chunk and reset so we can continue. + // ref: https://github.com/Mintplex-Labs/anything-llm/issues/416 + try { + chunk += message; + } catch (e) { + console.error(`Chunk appending error`, e); + chunk = ""; + } + continue; + } else { + chunk = ""; + } + } + + if (message == "[DONE]") { + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + resolve(fullText); + } else { + let finishReason = null; + let token = ""; + try { + const json = JSON.parse(message); + token = json?.choices?.[0]?.delta?.content; + finishReason = json?.choices?.[0]?.finish_reason || null; + } catch { + continue; + } + + if (token) { + fullText += token; + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: token, + close: false, + error: false, + }); + } + + if (finishReason !== null) { + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + resolve(fullText); + } + } + } + }); + }); } // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations diff --git a/server/utils/chats/embed.js b/server/utils/chats/embed.js index 30fc524d3f..5a7b93b49d 100644 --- a/server/utils/chats/embed.js +++ b/server/utils/chats/embed.js @@ -1,7 +1,7 @@ const { v4: uuidv4 } = require("uuid"); const { getVectorDbClass, getLLMProvider } = require("../helpers"); const { chatPrompt, convertToPromptHistory } = require("."); -const { writeResponseChunk, handleStreamResponses } = require("./stream"); +const { writeResponseChunk } = require("./stream"); const { EmbedChats } = require("../../models/embedChats"); async function streamChatWithForEmbed( @@ -150,7 +150,7 @@ async function streamChatWithForEmbed( const stream = await LLMConnector.streamGetChatCompletion(messages, { temperature: embed.workspace?.openAiTemp ?? LLMConnector.defaultTemp, }); - completeText = await handleStreamResponses(response, stream, { + completeText = await LLMConnector.handleStream(response, stream, { uuid, sources: [], }); @@ -227,7 +227,7 @@ async function streamEmptyEmbeddingChat({ embed.workspace, rawHistory ); - completeText = await handleStreamResponses(response, stream, { + completeText = await LLMConnector.handleStream(response, stream, { uuid, sources: [], }); diff --git a/server/utils/chats/stream.js b/server/utils/chats/stream.js index 0ee448a5ee..d16f6e6029 100644 --- a/server/utils/chats/stream.js +++ b/server/utils/chats/stream.js @@ -156,7 +156,7 @@ async function streamChatWithWorkspace( const stream = await LLMConnector.streamGetChatCompletion(messages, { temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp, }); - completeText = await handleStreamResponses(response, stream, { + completeText = await LLMConnector.handleStream(response, stream, { uuid, sources, }); @@ -214,7 +214,7 @@ async function streamEmptyEmbeddingChat({ workspace, rawHistory ); - completeText = await handleStreamResponses(response, stream, { + completeText = await LLMConnector.handleStream(response, stream, { uuid, sources: [], }); @@ -229,301 +229,10 @@ async function streamEmptyEmbeddingChat({ return; } -// TODO: Refactor this implementation -function handleStreamResponses(response, stream, responseProps) { +// The default way to handle a stream response. Functions best with OpenAI. +function handleDefaultStreamResponse(response, stream, responseProps) { const { uuid = uuidv4(), sources = [] } = responseProps; - // Gemini likes to return a stream asyncIterator which will - // be a totally different object than other models. - if (stream?.type === "geminiStream") { - return new Promise(async (resolve) => { - let fullText = ""; - for await (const chunk of stream.stream) { - fullText += chunk.text(); - writeResponseChunk(response, { - uuid, - sources: [], - type: "textResponseChunk", - textResponse: chunk.text(), - close: false, - error: false, - }); - } - - writeResponseChunk(response, { - uuid, - sources, - type: "textResponseChunk", - textResponse: "", - close: true, - error: false, - }); - resolve(fullText); - }); - } - - if (stream?.type === "azureStream") { - return new Promise(async (resolve) => { - let fullText = ""; - for await (const event of stream.stream) { - for (const choice of event.choices) { - const delta = choice.delta?.content; - if (!delta) continue; - fullText += delta; - writeResponseChunk(response, { - uuid, - sources: [], - type: "textResponseChunk", - textResponse: delta, - close: false, - error: false, - }); - } - } - - writeResponseChunk(response, { - uuid, - sources, - type: "textResponseChunk", - textResponse: "", - close: true, - error: false, - }); - resolve(fullText); - }); - } - - if (stream.type === "togetherAiStream") { - return new Promise((resolve) => { - let fullText = ""; - let chunk = ""; - stream.stream.data.on("data", (data) => { - const lines = data - ?.toString() - ?.split("\n") - .filter((line) => line.trim() !== ""); - - for (const line of lines) { - let validJSON = false; - const message = chunk + line.replace(/^data: /, ""); - - if (message !== "[DONE]") { - // JSON chunk is incomplete and has not ended yet - // so we need to stitch it together. You would think JSON - // chunks would only come complete - but they don't! - try { - JSON.parse(message); - validJSON = true; - } catch {} - - if (!validJSON) { - // It can be possible that the chunk decoding is running away - // and the message chunk fails to append due to string length. - // In this case abort the chunk and reset so we can continue. - // ref: https://github.com/Mintplex-Labs/anything-llm/issues/416 - try { - chunk += message; - } catch (e) { - console.error(`Chunk appending error`, e); - chunk = ""; - } - continue; - } else { - chunk = ""; - } - } - - if (message == "[DONE]") { - writeResponseChunk(response, { - uuid, - sources, - type: "textResponseChunk", - textResponse: "", - close: true, - error: false, - }); - resolve(fullText); - } else { - let finishReason = null; - let token = ""; - try { - const json = JSON.parse(message); - token = json?.choices?.[0]?.delta?.content; - finishReason = json?.choices?.[0]?.finish_reason || null; - } catch { - continue; - } - - if (token) { - fullText += token; - writeResponseChunk(response, { - uuid, - sources: [], - type: "textResponseChunk", - textResponse: token, - close: false, - error: false, - }); - } - - if (finishReason !== null) { - writeResponseChunk(response, { - uuid, - sources, - type: "textResponseChunk", - textResponse: "", - close: true, - error: false, - }); - resolve(fullText); - } - } - } - }); - }); - } - - if (stream.type === "huggingFaceStream") { - return new Promise((resolve) => { - let fullText = ""; - let chunk = ""; - stream.stream.data.on("data", (data) => { - const lines = data - ?.toString() - ?.split("\n") - .filter((line) => line.trim() !== ""); - - for (const line of lines) { - let validJSON = false; - const message = chunk + line.replace(/^data:/, ""); - if (message !== "[DONE]") { - // JSON chunk is incomplete and has not ended yet - // so we need to stitch it together. You would think JSON - // chunks would only come complete - but they don't! - try { - JSON.parse(message); - validJSON = true; - } catch { - console.log("Failed to parse message", message); - } - - if (!validJSON) { - // It can be possible that the chunk decoding is running away - // and the message chunk fails to append due to string length. - // In this case abort the chunk and reset so we can continue. - // ref: https://github.com/Mintplex-Labs/anything-llm/issues/416 - try { - chunk += message; - } catch (e) { - console.error(`Chunk appending error`, e); - chunk = ""; - } - continue; - } else { - chunk = ""; - } - } - - if (message == "[DONE]") { - writeResponseChunk(response, { - uuid, - sources, - type: "textResponseChunk", - textResponse: "", - close: true, - error: false, - }); - resolve(fullText); - } else { - let error = null; - let finishReason = null; - let token = ""; - try { - const json = JSON.parse(message); - error = json?.error || null; - token = json?.choices?.[0]?.delta?.content; - finishReason = json?.choices?.[0]?.finish_reason || null; - } catch { - continue; - } - - if (!!error) { - writeResponseChunk(response, { - uuid, - sources: [], - type: "textResponseChunk", - textResponse: null, - close: true, - error, - }); - resolve(""); - return; - } - - if (token) { - fullText += token; - writeResponseChunk(response, { - uuid, - sources: [], - type: "textResponseChunk", - textResponse: token, - close: false, - error: false, - }); - } - - if (finishReason !== null) { - writeResponseChunk(response, { - uuid, - sources, - type: "textResponseChunk", - textResponse: "", - close: true, - error: false, - }); - resolve(fullText); - } - } - } - }); - }); - } - - // If stream is not a regular OpenAI Stream (like if using native model, Ollama, or most LangChain interfaces) - // we can just iterate the stream content instead. - if (!stream.hasOwnProperty("data")) { - return new Promise(async (resolve) => { - let fullText = ""; - for await (const chunk of stream) { - if (chunk === undefined) - throw new Error( - "Stream returned undefined chunk. Aborting reply - check model provider logs." - ); - - const content = chunk.hasOwnProperty("content") ? chunk.content : chunk; - fullText += content; - writeResponseChunk(response, { - uuid, - sources: [], - type: "textResponseChunk", - textResponse: content, - close: false, - error: false, - }); - } - - writeResponseChunk(response, { - uuid, - sources, - type: "textResponseChunk", - textResponse: "", - close: true, - error: false, - }); - resolve(fullText); - }); - } - return new Promise((resolve) => { let fullText = ""; let chunk = ""; @@ -615,5 +324,5 @@ module.exports = { VALID_CHAT_MODE, streamChatWithWorkspace, writeResponseChunk, - handleStreamResponses, + handleDefaultStreamResponse, };