Skip to content

Commit

Permalink
Merge pull request adamcohenhillel#87 from makeiteasierapps/stream-fe…
Browse files Browse the repository at this point in the history
…ature

Stream feature
  • Loading branch information
adamcohenhillel authored Mar 8, 2024
2 parents adbde96 + f9952e7 commit e6f7ddb
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 135 deletions.
121 changes: 87 additions & 34 deletions app/src/components/Chat.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { SupabaseClient } from '@supabase/supabase-js';
import { useMutation, useQuery } from '@tanstack/react-query';
import { useEffect, useRef, useState } from 'react';
import { toast } from 'sonner';
import { useSupabaseConfig } from '../utils/useSupabaseConfig';
import ChatLog, { Message } from './ChatLog';
import LogoutButton from './LogoutButton';
import { NavMenu } from './NavMenu';
Expand All @@ -22,47 +23,99 @@ export default function Chat({
const [messages, setMessages] = useState<Message[]>([]);
const [conversationId, setConversationId] = useState<number | null>(null);
const [waitingForResponse, setWaitingForResponse] = useState(false);
const [isStreaming, setIsStreaming] = useState(false);

const { supabaseUrl, supabaseToken } = useSupabaseConfig();

const sendMessageAndReceiveResponse = useMutation({
mutationFn: async (userMessage: Message) => {
const { data: sendMessageData, error: sendMessageError } =
await supabaseClient
.from('conversations')
.update({ context: [...messages, userMessage] })
.eq('id', conversationId);

if (sendMessageError) throw sendMessageError;

setMessages([...messages, userMessage]);
setWaitingForResponse(true);

const { data: aiResponseData, error: aiResponseError } =
await supabaseClient.functions.invoke('chat', {
body: { messageHistory: [...messages, userMessage] },
});

if (aiResponseError) throw aiResponseError;

const { data: updateConversationData, error: updateConversationError } =
await supabaseClient
.from('conversations')
.update({ context: [...messages, userMessage, aiResponseData.msg] })
.eq('id', conversationId);

if (updateConversationError) throw updateConversationError;

return aiResponseData;
},
onError: (error) => {
toast.error(error.message || 'Unknown error');
setWaitingForResponse(false);
},
onSuccess: (aiResponse) => {
setMessages((currentMessages) => {
return [...currentMessages, aiResponse.msg as Message];
// Invoke the function and get the response as a ReadableStream
const url = `${supabaseUrl}/functions/v1/chat`;
const headers = {
Authorization: `Bearer ${supabaseToken}`,
'Content-Type': 'application/json',
};
const response = await fetch(url, {
method: 'POST',
headers: headers,
body: JSON.stringify({
messageHistory: [...messages, userMessage],
timestamp: new Date().toISOString(),
}),
});

setWaitingForResponse(false);
if (!response.ok) {
throw new Error('Network response was not ok');
}

if (response.body) {
const reader = response.body.getReader();
setWaitingForResponse(false);
setIsStreaming(true);

try {
let completeResponse = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunks = new TextDecoder().decode(value).split('\n');
for (const chunk of chunks) {
if (chunk) {
const aiResponse = JSON.parse(chunk);
if (aiResponse.message) {
completeResponse += aiResponse.message;
setMessages((prevMessages) => {
const updatedMessages = [...prevMessages];
const lastMessageIndex = updatedMessages.length - 1;
if (
lastMessageIndex >= 0 &&
updatedMessages[lastMessageIndex].role === 'assistant'
) {
// If the last message is from the assistant, update its content
updatedMessages[lastMessageIndex] = {
...updatedMessages[lastMessageIndex],
content:
updatedMessages[lastMessageIndex].content +
aiResponse.message,
};
} else {
// Otherwise, add a new message from the assistant
updatedMessages.push({
role: 'assistant',
content: aiResponse.message,
});
}
return updatedMessages;
});
}
}
}

const updatedContext = [
...messages,
userMessage,
{ role: 'assistant', content: completeResponse },
];
const updateResponse = await supabaseClient
.from('conversations')
.update({ context: updatedContext })
.eq('id', conversationId);
if (updateResponse.error) {
throw updateResponse.error;
}
}
} catch (error) {
console.error('Stream reading failed', error);
setIsStreaming(false);
setWaitingForResponse(false);
} finally {
reader.releaseLock();
}
}
setIsStreaming(false);
},
});

Expand Down Expand Up @@ -173,7 +226,7 @@ export default function Chat({
textareaRef={textareaRef}
entryData={entryData}
setEntryData={setEntryData}
waitingForResponse={waitingForResponse}
isDisabled={isStreaming || waitingForResponse}
sendMessage={() => {
if (!entryData.trim()) return;
const userMessage = { role: 'user', content: entryData.trim() };
Expand Down
8 changes: 4 additions & 4 deletions app/src/components/PromptForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ export default function PromptForm({
textareaRef,
entryData,
setEntryData,
waitingForResponse,
isDisabled,
sendMessage,
}: {
textareaRef: React.RefObject<HTMLTextAreaElement>;
entryData: string;
setEntryData: React.Dispatch<React.SetStateAction<string>>;
waitingForResponse: boolean;
isDisabled: boolean;
sendMessage: () => void;
}) {
return (
Expand All @@ -35,13 +35,13 @@ export default function PromptForm({
textareaRef.current.style.height = `${textareaRef.current.scrollHeight}px`;
}
}}
disabled={waitingForResponse}
disabled={isDisabled}
placeholder="What is on your mind?"
></textarea>
<Button
size={'icon'}
className="disabled:bg-muted/40 relative bottom-0 right-2 ml-auto rounded-lg transition-colors disabled:cursor-not-allowed disabled:opacity-50"
disabled={waitingForResponse || entryData.length == 0}
disabled={isDisabled || entryData.length === 0}
onClick={sendMessage}
>
<SendHorizontal size={20} />
Expand Down
Loading

0 comments on commit e6f7ddb

Please sign in to comment.