diff --git a/examples/example-api.ts b/examples/example-api.ts index cb9d0de9..8a835a2a 100644 --- a/examples/example-api.ts +++ b/examples/example-api.ts @@ -9,7 +9,7 @@ import { Orchestrator } from "../packages/core/src/core/orchestrator"; import { HandlerRole } from "../packages/core/src/core/types"; -import { RoomManager } from "../packages/core/src/core/room-manager"; +import { ConversationManager } from "../packages/core/src/core/conversation-manager"; import { ChromaVectorDB } from "../packages/core/src/core/vector-db"; import { MessageProcessor } from "../packages/core/src/core/processors/message-processor"; import { ResearchQuantProcessor } from "../packages/core/src/core/processors/research-processor"; @@ -22,6 +22,7 @@ import { z } from "zod"; import readline from "readline"; import { MongoDb } from "../packages/core/src/core/db/mongo-db"; import { MasterProcessor } from "../packages/core/src/core/processors/master-processor"; +import { makeFlowLifecycle } from "../packages/core/src/core/life-cycle"; async function main() { const loglevel = LogLevel.DEBUG; @@ -33,7 +34,7 @@ async function main() { await vectorDb.purge(); // Clear previous session data - const roomManager = new RoomManager(vectorDb); + const conversationManager = new ConversationManager(vectorDb); // Research client const researchClient = new LLMClient({ @@ -52,22 +53,16 @@ async function main() { loglevel ); - // Initialize processor with default character personality - const messageProcessor = new MessageProcessor( - llmClient, - defaultCharacter, - loglevel - ); - - const researchProcessor = new ResearchQuantProcessor( - researchClient, - defaultCharacter, - loglevel, - 1000 // chunk size, depends - ); - // Add processors to the master processor - masterProcessor.addProcessor([messageProcessor, researchProcessor]); + masterProcessor.addProcessor([ + new MessageProcessor(llmClient, defaultCharacter, loglevel), + new ResearchQuantProcessor( + researchClient, + defaultCharacter, + loglevel, + 1000 // chunk size, depends + ), + ]); const scheduledTaskDb = new MongoDb( "mongodb://localhost:27017", @@ -80,12 +75,9 @@ async function main() { await scheduledTaskDb.deleteAll(); - // Initialize core system const orchestrator = new Orchestrator( - roomManager, - vectorDb, masterProcessor, - scheduledTaskDb, + makeFlowLifecycle(scheduledTaskDb, conversationManager), { level: loglevel, enableColors: true, @@ -94,7 +86,7 @@ async function main() { ); // Initialize autonomous thought generation - const consciousness = new Consciousness(llmClient, roomManager, { + const consciousness = new Consciousness(llmClient, conversationManager, { intervalMs: 300000, // Think every 5 minutes minConfidence: 0.7, logLevel: loglevel, @@ -220,12 +212,12 @@ async function main() { const outputs: any = await orchestrator.dispatchToInput( "user_chat", { - headers: { - "x-user-id": userId, - }, - }, - userMessage, - userId + userId, + platformId: "discord", + threadId: "123", + data: { content: userMessage }, + contentId: "123", + } ); // Now `outputs` is an array of suggestions with role=output that got triggered diff --git a/examples/example-basic.ts b/examples/example-basic.ts index 9731044d..fa87e74b 100644 --- a/examples/example-basic.ts +++ b/examples/example-basic.ts @@ -88,7 +88,9 @@ async function main() { role: HandlerRole.OUTPUT, execute: async (data: any) => { const result = await starknetChain.write(data.payload); - return `Transaction: ${JSON.stringify(result, null, 2)}`; + return { + content: `Transaction: ${JSON.stringify(result, null, 2)}`, + }; }, outputSchema: z .object({ @@ -123,7 +125,9 @@ async function main() { `query: ${query}`, `result: ${JSON.stringify(result, null, 2)}`, ].join("\n\n"); - return `GraphQL data fetched successfully: ${resultStr}`; + return { + content: `GraphQL data fetched successfully: ${resultStr}`, + }; }, outputSchema: z .object({ diff --git a/examples/example-discord.ts b/examples/example-discord.ts index 6007382d..b5aa3f46 100644 --- a/examples/example-discord.ts +++ b/examples/example-discord.ts @@ -7,18 +7,17 @@ import { Orchestrator } from "../packages/core/src/core/orchestrator"; import { HandlerRole, LogLevel } from "../packages/core/src/core/types"; import { DiscordClient } from "../packages/core/src/core/io/discord"; -import { RoomManager } from "../packages/core/src/core/room-manager"; +import { ConversationManager } from "../packages/core/src/core/conversation-manager"; import { ChromaVectorDB } from "../packages/core/src/core/vector-db"; import { MessageProcessor } from "../packages/core/src/core/processors/message-processor"; import { LLMClient } from "../packages/core/src/core/llm-client"; import { env } from "../packages/core/src/core/env"; import chalk from "chalk"; import { defaultCharacter } from "../packages/core/src/core/character"; -import { z } from "zod"; import readline from "readline"; import { MongoDb } from "../packages/core/src/core/db/mongo-db"; -import { Message } from "discord.js"; import { MasterProcessor } from "../packages/core/src/core/processors/master-processor"; +import { makeFlowLifecycle } from "../packages/core/src/core/life-cycle"; async function main() { // Set logging level as you see fit @@ -33,10 +32,10 @@ async function main() { // Optional: Purge previous session data if you want a fresh start await vectorDb.purge(); - const roomManager = new RoomManager(vectorDb); + const conversationManager = new ConversationManager(vectorDb); const llmClient = new LLMClient({ - model: "anthropic/claude-3-5-sonnet-latest", // Example model + model: "anthropic/claude-3-5-sonnet-latest", temperature: 0.3, }); @@ -46,33 +45,26 @@ async function main() { loglevel ); - // Initialize processor with default character personality - const messageProcessor = new MessageProcessor( - llmClient, - defaultCharacter, - loglevel + masterProcessor.addProcessor( + new MessageProcessor(llmClient, defaultCharacter, loglevel) ); - masterProcessor.addProcessor(messageProcessor); - // Connect to MongoDB (for scheduled tasks, if you use them) - const scheduledTaskDb = new MongoDb( + const KVDB = new MongoDb( "mongodb://localhost:27017", "myApp", "scheduled_tasks" ); - await scheduledTaskDb.connect(); + await KVDB.connect(); console.log(chalk.green("✅ Scheduled task database connected")); // Clear any existing tasks if you like - await scheduledTaskDb.deleteAll(); + await KVDB.deleteAll(); // Create the Orchestrator const core = new Orchestrator( - roomManager, - vectorDb, masterProcessor, - scheduledTaskDb, + makeFlowLifecycle(KVDB, conversationManager), { level: loglevel, enableColors: true, @@ -96,9 +88,7 @@ async function main() { name: "discord_stream", role: HandlerRole.INPUT, subscribe: (onData) => { - discord.startMessageStream((incomingMessage: Message) => { - onData(incomingMessage); - }); + discord.startMessageStream(onData); return () => { discord.stopMessageStream(); }; diff --git a/examples/example-goal.ts b/examples/example-goal.ts index 5c5a80ad..e9bc4e9b 100644 --- a/examples/example-goal.ts +++ b/examples/example-goal.ts @@ -106,11 +106,13 @@ async function main() { role: HandlerRole.OUTPUT, execute: async (data: any) => { const result = await starknetChain.write(data.payload); - return `Transaction executed successfully: ${JSON.stringify( - result, - null, - 2 - )}`; + return { + content: `Transaction executed successfully: ${JSON.stringify( + result, + null, + 2 + )}`, + }; }, outputSchema: z .object({ @@ -145,7 +147,9 @@ async function main() { `query: ${query}`, `result: ${JSON.stringify(result, null, 2)}`, ].join("\n\n"); - return `GraphQL data fetched successfully: ${resultStr}`; + return { + content: `GraphQL data fetched successfully: ${resultStr}`, + }; }, outputSchema: z .object({ diff --git a/examples/example-server.ts b/examples/example-server.ts index 02eed6ed..edac04b8 100644 --- a/examples/example-server.ts +++ b/examples/example-server.ts @@ -12,24 +12,25 @@ import { ChromaVectorDB } from "../packages/core/src/core/vector-db"; import { Orchestrator } from "../packages/core/src/core/orchestrator"; import { HandlerRole } from "../packages/core/src/core/types"; -import { RoomManager } from "../packages/core/src/core/room-manager"; +import { ConversationManager } from "../packages/core/src/core/conversation-manager"; import { MessageProcessor } from "../packages/core/src/core/processors/message-processor"; import { defaultCharacter } from "../packages/core/src/core/character"; import { LogLevel } from "../packages/core/src/core/types"; import { MongoDb } from "../packages/core/src/core/db/mongo-db"; import { MasterProcessor } from "../packages/core/src/core/processors/master-processor"; +import { makeFlowLifecycle } from "../packages/core/src/core/life-cycle"; -const scheduledTaskDb = new MongoDb( +const kvDb = new MongoDb( "mongodb://localhost:27017", "myApp", "scheduled_tasks" ); -await scheduledTaskDb.connect(); +await kvDb.connect(); console.log(chalk.green("✅ Scheduled task database connected")); -await scheduledTaskDb.deleteAll(); +await kvDb.deleteAll(); // ------------------------------------------------------ // 1) CREATE DAYDREAMS AGENT @@ -50,7 +51,7 @@ async function createDaydreamsAgent() { }); // 1.3. Room manager initialization - const roomManager = new RoomManager(vectorDb); + const conversationManager = new ConversationManager(vectorDb); const masterProcessor = new MasterProcessor( llmClient, @@ -69,10 +70,8 @@ async function createDaydreamsAgent() { // 1.5. Initialize core system const orchestrator = new Orchestrator( - roomManager, - vectorDb, masterProcessor, - scheduledTaskDb, + makeFlowLifecycle(kvDb, conversationManager), { level: loglevel, enableColors: true, @@ -102,6 +101,10 @@ async function createDaydreamsAgent() { message: string; }; console.log(`Reply to user ${userId ?? "??"}: ${message}`); + return { + userId, + message, + }; }, }); @@ -151,16 +154,13 @@ wss.on("connection", (ws) => { } // Process the message using the orchestrator with the provided userId - const outputs = await orchestrator.dispatchToInput( - "user_chat", - { - headers: { - "x-user-id": userId, - }, - }, - userMessage, - orchestratorId ? orchestratorId : undefined - ); + const outputs = await orchestrator.dispatchToInput("user_chat", { + userId, + platformId: "discord", + threadId: orchestratorId, + data: { content: userMessage }, + contentId: orchestratorId, + }); // Send responses back through WebSocket if (outputs && (outputs as any).length > 0) { @@ -211,8 +211,7 @@ app.get("/api/history/:userId", async (req, res) => { console.log("Fetching history for userId:", userId); // Get all orchestrator records for this user - const histories = - await scheduledTaskDb.getOrchestratorsByUserId(userId); + const histories = await kvDb.getOrchestratorsByUserId(userId); if (!histories || histories.length === 0) { console.log("No histories found"); @@ -241,7 +240,7 @@ app.get("/api/history/:userId/:chatId", async (req, res) => { return res.status(400).json({ error: "Invalid chat ID format" }); } - const history = await scheduledTaskDb.getOrchestratorById(objectId); + const history = await kvDb.getOrchestratorById(objectId); if (!history) { return res.status(404).json({ error: "History not found" }); diff --git a/examples/example-twitter.ts b/examples/example-twitter.ts index 7c7eae2e..bcc5d531 100644 --- a/examples/example-twitter.ts +++ b/examples/example-twitter.ts @@ -10,7 +10,7 @@ import { Orchestrator } from "../packages/core/src/core/orchestrator"; import { HandlerRole } from "../packages/core/src/core/types"; import { TwitterClient } from "../packages/core/src/core/io/twitter"; -import { RoomManager } from "../packages/core/src/core/room-manager"; +import { ConversationManager } from "../packages/core/src/core/conversation-manager"; import { ChromaVectorDB } from "../packages/core/src/core/vector-db"; import { MessageProcessor } from "../packages/core/src/core/processors/message-processor"; import { LLMClient } from "../packages/core/src/core/llm-client"; @@ -25,56 +25,59 @@ import { MongoDb } from "../packages/core/src/core/db/mongo-db"; import { SchedulerService } from "../packages/core/src/core/schedule-service"; import { MasterProcessor } from "../packages/core/src/core/processors/master-processor"; import { Logger } from "../packages/core/src/core/logger"; +import { makeFlowLifecycle } from "../packages/core/src/core/life-cycle"; async function main() { const loglevel = LogLevel.DEBUG; + // Initialize core dependencies const vectorDb = new ChromaVectorDB("twitter_agent", { chromaUrl: "http://localhost:8000", logLevel: loglevel, }); - await vectorDb.purge(); // Clear previous session data + // Clear previous session data + await vectorDb.purge(); - const roomManager = new RoomManager(vectorDb); + // Initialize room manager + const conversationManager = new ConversationManager(vectorDb); + // Initialize LLM client const llmClient = new LLMClient({ - model: "openrouter:deepseek/deepseek-r1-distill-llama-70b", + model: "anthropic/claude-3-5-sonnet-latest", temperature: 0.3, }); + // Initialize master processor const masterProcessor = new MasterProcessor( llmClient, defaultCharacter, loglevel ); - // Initialize processor with default character personality - const messageProcessor = new MessageProcessor( - llmClient, - defaultCharacter, - loglevel - ); - - masterProcessor.addProcessor(messageProcessor); + // Add message processor to master processor + masterProcessor.addProcessor([ + new MessageProcessor(llmClient, defaultCharacter, loglevel), + ]); - const scheduledTaskDb = new MongoDb( + // Initialize MongoDB for scheduled tasks + const kvDb = new MongoDb( "mongodb://localhost:27017", "myApp", "scheduled_tasks" ); - await scheduledTaskDb.connect(); + // Connect to MongoDB + await kvDb.connect(); console.log(chalk.green("✅ Scheduled task database connected")); - await scheduledTaskDb.deleteAll(); + // Delete previous data for testing + await kvDb.deleteAll(); // Initialize core system const orchestrator = new Orchestrator( - roomManager, - vectorDb, masterProcessor, - scheduledTaskDb, + makeFlowLifecycle(kvDb, conversationManager), { level: loglevel, enableColors: true, @@ -82,6 +85,7 @@ async function main() { } ); + // Initialize scheduler service const scheduler = new SchedulerService( { logger: new Logger({ @@ -89,14 +93,15 @@ async function main() { enableColors: true, enableTimestamp: true, }), - orchestratorDb: scheduledTaskDb, - roomManager: roomManager, + orchestratorDb: kvDb, + conversationManager: conversationManager, vectorDb: vectorDb, }, orchestrator, 10000 ); + // Start scheduler service scheduler.start(); // Set up Twitter client with credentials @@ -110,13 +115,13 @@ async function main() { ); // Initialize autonomous thought generation - const consciousness = new Consciousness(llmClient, roomManager, { + const consciousness = new Consciousness(llmClient, conversationManager, { intervalMs: 300000, // Think every 5 minutes minConfidence: 0.7, logLevel: loglevel, }); - // Register input handler for Twitter mentions + // Register input handler for Twitter mentions orchestrator.registerIOHandler({ name: "twitter_mentions", role: HandlerRole.INPUT, @@ -126,19 +131,26 @@ async function main() { const mentionsInput = twitter.createMentionsInput(60000); const mentions = await mentionsInput.handler(); - // If no new mentions, return null to skip processing - if (!mentions || mentions.length === 0) { - return null; + // If no new mentions, return an empty array to skip processing + if (!mentions || !mentions.length) { + return []; } - return mentions.map((mention) => ({ - type: "tweet", - room: mention.metadata.conversationId, - contentId: mention.metadata.tweetId, - user: mention.metadata.username, - content: mention.content, - metadata: mention, - })); + // Filter out mentions that do not have the required non-null properties before mapping + return mentions + .filter( + (mention) => + mention.metadata.tweetId !== undefined && + mention.metadata.conversationId !== undefined && + mention.metadata.userId !== undefined + ) + .map((mention) => ({ + userId: mention.metadata.userId!, + threadId: mention.metadata.conversationId!, + contentId: mention.metadata.tweetId!, + platformId: "twitter", + data: mention, + })); }, }); @@ -152,10 +164,16 @@ async function main() { // If no thought was generated or it was already processed, skip if (!thought || !thought.content) { - return null; + return []; } - return thought; + return { + userId: "internal", + threadId: "internal", + contentId: "internal", + platformId: "internal", + data: thought, + }; }, }); @@ -166,6 +184,7 @@ async function main() { execute: async (data: unknown) => { const thoughtData = data as { content: string }; + // Post thought to Twitter return twitter.createTweetOutput().handler({ content: thoughtData.content, }); @@ -184,14 +203,16 @@ async function main() { ), }); - // Schedule a task to run every minute - await scheduler.scheduleTaskInDb("sleever", "twitter_mentions", {}, 6000); // Check mentions every minute + // Schedule a task to check mentions every minute + await scheduler.scheduleTaskInDb("sleever", "twitter_mentions", {}, 60000); + + // Schedule a task to generate thoughts every 5 minutes await scheduler.scheduleTaskInDb( "sleever", "consciousness_thoughts", {}, - 30000 - ); // Think every 5 minutes + 300000 + ); // Register output handler for Twitter replies orchestrator.registerIOHandler({ @@ -200,6 +221,7 @@ async function main() { execute: async (data: unknown) => { const tweetData = data as { content: string; inReplyTo: string }; + // Post reply to Twitter return twitter.createTweetOutput().handler(tweetData); }, outputSchema: z diff --git a/packages/core/src/core/__tests__/vector-db.test.ts b/packages/core/src/core/__tests__/vector-db.test.ts index d04c04ef..efe75b5b 100644 --- a/packages/core/src/core/__tests__/vector-db.test.ts +++ b/packages/core/src/core/__tests__/vector-db.test.ts @@ -143,17 +143,17 @@ describe("ChromaVectorDB", () => { }); }); - // Room Operations - describe("Room Operations", () => { - test("listRooms() should return room collections", async () => { + // Conversation Operations + describe("Conversation Operations", () => { + test("listConversations() should return conversation collections", async () => { mockClient.listCollections.mockResolvedValueOnce([ - "room_123", - "room_456", + "conversation_123", + "conversation_456", "other_collection", ]); - const rooms = await db.listRooms(); - expect(rooms).toEqual(["123", "456"]); + const conversations = await db.listConversations(); + expect(conversations).toEqual(["123", "456"]); }); }); diff --git a/packages/core/src/core/consciousness.ts b/packages/core/src/core/consciousness.ts index 2a3bb475..0f3d4eb5 100644 --- a/packages/core/src/core/consciousness.ts +++ b/packages/core/src/core/consciousness.ts @@ -1,20 +1,20 @@ import { Logger } from "./logger"; import { LLMClient } from "./llm-client"; -import { type Room } from "./room"; -import { type RoomManager } from "./room-manager"; +import { Conversation } from "./conversation"; +import { ConversationManager } from "./conversation-manager"; import { LogLevel, type Thought } from "./types"; import { validateLLMResponseSchema } from "./utils"; import { z } from "zod"; export class Consciousness { - private static readonly ROOM_ID = "consciousness_main"; + private static readonly CONVERSATION_ID = "consciousness_main"; private logger: Logger; private thoughtInterval: NodeJS.Timer | null = null; constructor( private llmClient: LLMClient, - private roomManager: RoomManager, + private conversationManager: ConversationManager, private config: { intervalMs?: number; minConfidence?: number; @@ -59,7 +59,7 @@ export class Consciousness { confidence: thought.confidence, suggestedActions: thought.context?.suggestedActions || [], - roomId: Consciousness.ROOM_ID, + conversationId: Consciousness.CONVERSATION_ID, }, }; } else { @@ -110,7 +110,7 @@ export class Consciousness { private async generateThought(): Promise { const recentMemories = this.getRecentMemories( - await this.roomManager.listRooms() + await this.conversationManager.listConversations() ); const prompt = `Analyze these recent memories and generate an insightful thought. @@ -229,21 +229,21 @@ export class Consciousness { } private getRecentMemories( - rooms: Room[], + conversations: Conversation[], limit: number = 10 - ): Array<{ content: string; roomId: string }> { + ): Array<{ content: string; conversationId: string }> { const allMemories: Array<{ content: string; - roomId: string; + conversationId: string; timestamp: Date; }> = []; - for (const room of rooms) { - const memories = room.getMemories(5); // Get last 5 memories from each room + for (const conversation of conversations) { + const memories = conversation.getMemories(5); // Get last 5 memories from each conversation allMemories.push( ...memories.map((m) => ({ content: m.content, - roomId: room.id, + conversationId: conversation.id, timestamp: m.timestamp, })) ); @@ -253,6 +253,9 @@ export class Consciousness { return allMemories .sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime()) .slice(0, limit) - .map(({ content, roomId }) => ({ content, roomId })); + .map(({ content, conversationId }) => ({ + content, + conversationId, + })); } } diff --git a/packages/core/src/core/conversation-manager.ts b/packages/core/src/core/conversation-manager.ts new file mode 100644 index 00000000..d687c633 --- /dev/null +++ b/packages/core/src/core/conversation-manager.ts @@ -0,0 +1,342 @@ +import { Conversation } from "./conversation"; +import type { Memory, ConversationMetadata } from "./types"; +import { ChromaVectorDB } from "./vector-db"; +import { Logger } from "./logger"; +import { LogLevel } from "./types"; + +export class ConversationManager { + private logger: Logger; + + constructor( + private vectorDb?: ChromaVectorDB, + config: { + logLevel?: LogLevel; + } = {} + ) { + this.logger = new Logger({ + level: config.logLevel || LogLevel.INFO, + enableColors: true, + enableTimestamp: true, + }); + } + + public async getConversation( + conversationId: string + ): Promise { + if (!this.vectorDb) { + this.logger.warn( + "ConversationManager.getConversation", + "No VectorDB provided" + ); + return undefined; + } + + try { + const collection = + await this.vectorDb.getCollectionForConversation( + conversationId + ); + const metadata = collection.metadata; + + if (!metadata?.platform || !metadata?.platformId) { + this.logger.warn( + "ConversationManager.getConversation", + "Conversation missing required metadata", + { + conversationId, + } + ); + return undefined; + } + + return new Conversation( + metadata.platformId as string, + metadata.platform as string, + { + name: metadata.name as string, + description: metadata.description as string, + participants: metadata.participants as string[], + createdAt: new Date( + metadata.created as string | number | Date + ), + lastActive: new Date( + (metadata.lastActive || metadata.created) as + | string + | number + | Date + ), + } + ); + } catch (error) { + this.logger.error( + "ConversationManager.getConversation", + "Failed to get conversation", + { + error: + error instanceof Error ? error.message : String(error), + conversationId, + } + ); + return undefined; + } + } + + public async getConversationByPlatformId( + platformId: string, + platform: string + ): Promise { + if (platform === "consciousness") { + platformId = "main"; + } + + const conversationId = Conversation.createDeterministicId( + platform, + platformId + ); + return this.getConversation(conversationId); + } + + public async createConversation( + platformId: string, + platform: string, + metadata?: Partial + ): Promise { + if (!this.vectorDb) { + throw new Error("VectorDB required for conversation creation"); + } + + const conversation = new Conversation(platformId, platform, metadata); + + try { + const collection = await this.vectorDb.getCollectionForConversation( + conversation.id + ); + + // Update collection with full conversation metadata including userId + await collection.modify({ + metadata: { + description: "Conversation-specific memory storage", + conversationId: conversation.id, + platform: conversation.platform, + platformId: conversation.platformId, + created: conversation.getMetadata().createdAt.toISOString(), + lastActive: conversation + .getMetadata() + .lastActive.toISOString(), + name: metadata?.name, + participants: metadata?.participants, + userId: metadata?.userId, // Include userId in collection metadata + }, + }); + + this.logger.debug( + "ConversationManager.createConversation", + "Conversation collection created", + { + conversationId: conversation.id, + platform, + platformId, + userId: metadata?.userId, // Log userId + } + ); + + return conversation; + } catch (error) { + this.logger.error( + "ConversationManager.createConversation", + "Failed to create conversation collection", + { + error: + error instanceof Error ? error.message : String(error), + conversationId: conversation.id, + userId: metadata?.userId, // Log userId in errors + } + ); + throw error; + } + } + + public async addMemory( + conversationId: string, + content: string, + metadata?: Record + ): Promise { + if (!this.vectorDb) { + throw new Error("VectorDB required for adding memories"); + } + + const conversation = await this.getConversation(conversationId); + if (!conversation) { + throw new Error(`Conversation ${conversationId} not found`); + } + + const memory = await conversation.addMemory(content, metadata); + + // Store in conversation-specific collection with userId from metadata + await this.vectorDb.storeInConversation( + memory.content, + conversation.id, + { + memoryId: memory.id, + timestamp: memory.timestamp, + platform: conversation.platform, + userId: metadata?.userId, // Include userId in vector storage + ...metadata, + } + ); + + return memory; + } + + public async findSimilarMemoriesInConversation( + content: string, + conversationId: string, + limit = 5 + ): Promise { + if (!this.vectorDb) { + throw new Error("VectorDB required for finding memories"); + } + + const results = await this.vectorDb.findSimilarInConversation( + content, + conversationId, + limit + ); + + return results.map((result) => ({ + id: result.metadata?.memoryId, + conversationId: conversationId, + content: result.content, + timestamp: new Date(result.metadata?.timestamp), + metadata: result.metadata, + })); + } + + public async listConversations(): Promise { + if (!this.vectorDb) { + return []; + } + + const conversationIds = await this.vectorDb.listConversations(); + const conversations: Conversation[] = []; + + for (const conversationId of conversationIds) { + const conversation = await this.getConversation(conversationId); + if (conversation) { + conversations.push(conversation); + } + } + + return conversations; + } + + public async ensureConversation( + name: string, + platform: string, + userId?: string + ): Promise { + let conversation = await this.getConversationByPlatformId( + name, + platform + ); + if (!conversation) { + conversation = await this.createConversation(name, platform, { + name, + description: `Conversation for ${name}`, + participants: [], + userId, // Add userId to metadata + }); + } + return conversation; + } + + public async deleteConversation(conversationId: string): Promise { + if (!this.vectorDb) { + return; + } + + await this.vectorDb.deleteConversation(conversationId); + this.logger.info( + "ConversationManager.deleteConversation", + "Conversation deleted", + { conversationId } + ); + } + + public async getMemoriesFromConversation( + conversationId: string, + limit?: number + ): Promise { + if (!this.vectorDb) { + throw new Error("VectorDB required for getting memories"); + } + + const conversation = await this.getConversation(conversationId); + if (!conversation) { + throw new Error(`Conversation ${conversationId} not found`); + } + + const memories = await this.vectorDb.getMemoriesFromConversation( + conversationId, + limit + ); + + return memories.map((memory) => ({ + id: memory.metadata?.memoryId, + conversationId: conversationId, + content: memory.content, + timestamp: new Date(memory.metadata?.timestamp), + metadata: memory.metadata, + })); + } + + public async hasProcessedContentInConversation( + contentId: string, + conversationId: string + ): Promise { + if (!this.vectorDb) { + throw new Error("VectorDB required for getting memories"); + } + + const conversation = await this.getConversation(conversationId); + if (!conversation) { + this.logger.error( + "ConversationManager.markContentAsProcessed", + "Conversation not found", + { + conversationId, + } + ); + return false; + } + + return await this.vectorDb.hasProcessedContent(contentId, conversation); + } + + public async markContentAsProcessed( + contentId: string, + conversationId: string + ): Promise { + if (!this.vectorDb) { + throw new Error( + "VectorDB required for marking content as processed" + ); + } + + const conversation = await this.getConversation(conversationId); + + if (!conversation) { + this.logger.error( + "ConversationManager.markContentAsProcessed", + "Conversation not found", + { + conversationId, + } + ); + return false; + } + + await this.vectorDb.markContentAsProcessed(contentId, conversation); + return true; + } +} diff --git a/packages/core/src/core/room.ts b/packages/core/src/core/conversation.ts similarity index 59% rename from packages/core/src/core/room.ts rename to packages/core/src/core/conversation.ts index 8279d1a5..be78864a 100644 --- a/packages/core/src/core/room.ts +++ b/packages/core/src/core/conversation.ts @@ -1,33 +1,33 @@ import { createHash } from "crypto"; -import type { RoomMetadata } from "./types"; +import type { ConversationMetadata } from "./types"; import type { Memory } from "./types"; /** - * Represents a room/conversation context that can store memories and metadata. + * Represents a conversation context that can store memories and metadata. */ -export class Room { - /** Unique identifier for the room */ +export class Conversation { + /** Unique identifier for the conversation */ public readonly id: string; - /** Collection of memories associated with this room */ + /** Collection of memories associated with this conversation */ private memories: Memory[] = []; - /** Metadata about the room like name, description, participants etc */ - private metadata: RoomMetadata; + /** Metadata about the conversation like name, description, participants etc */ + private metadata: ConversationMetadata; /** - * Creates a new Room instance + * Creates a new Conversation instance * @param platformId - Platform-specific identifier (e.g. tweet thread ID, chat ID) - * @param platform - Platform name where this room exists - * @param metadata - Optional metadata to initialize the room with + * @param platform - Platform name where this conversation exists + * @param metadata - Optional metadata to initialize the conversation with */ constructor( public readonly platformId: string, public readonly platform: string, - metadata?: Partial + metadata?: Partial ) { - this.id = Room.createDeterministicId(platform, platformId); + this.id = Conversation.createDeterministicId(platform, platformId); this.metadata = { - name: metadata?.name || `Room ${platformId}`, + name: metadata?.name || `Conversation ${platformId}`, description: metadata?.description, participants: metadata?.participants || [], createdAt: metadata?.createdAt || new Date(), @@ -37,10 +37,10 @@ export class Room { } /** - * Creates a deterministic room ID based on platform and platformId + * Creates a deterministic conversation ID based on platform and platformId * @param platform - Platform name * @param platformId - Platform-specific identifier - * @returns A deterministic room ID string + * @returns A deterministic conversation ID string */ public static createDeterministicId( platform: string, @@ -55,7 +55,7 @@ export class Room { } /** - * Adds a new memory to the room + * Adds a new memory to the conversation * @param content - Content of the memory * @param metadata - Optional metadata for the memory * @returns The created Memory object @@ -64,12 +64,15 @@ export class Room { content: string, metadata?: Record ): Promise { - // Create deterministic memory ID based on room ID and content - const memoryId = Room.createDeterministicMemoryId(this.id, content); + // Create deterministic memory ID based on Conversation ID and content + const memoryId = Conversation.createDeterministicMemoryId( + this.id, + content + ); const memory: Memory = { id: memoryId, - roomId: this.id, + conversationId: this.id, content, timestamp: new Date(), metadata, @@ -82,17 +85,17 @@ export class Room { } /** - * Creates a deterministic memory ID based on room ID and content - * @param roomId - ID of the room + * Creates a deterministic memory ID based on conversation ID and content + * @param conversationId - ID of the conversation * @param content - Content of the memory * @returns A deterministic memory ID string */ public static createDeterministicMemoryId( - roomId: string, + conversationId: string, content: string ): string { const hash = createHash("sha256") - .update(`${roomId}:${content}`) + .update(`${conversationId}:${content}`) .digest("hex") .slice(0, 16); @@ -100,7 +103,7 @@ export class Room { } /** - * Retrieves memories from the room + * Retrieves memories from the conversation * @param limit - Optional limit on number of memories to return * @returns Array of Memory objects */ @@ -109,18 +112,18 @@ export class Room { } /** - * Gets a copy of the room's metadata - * @returns Copy of room metadata + * Gets a copy of the conversation's metadata + * @returns Copy of conversation metadata */ - public getMetadata(): RoomMetadata { + public getMetadata(): ConversationMetadata { return { ...this.metadata }; } /** - * Updates the room's metadata + * Updates the conversation's metadata * @param update - Partial metadata object with fields to update */ - public updateMetadata(update: Partial): void { + public updateMetadata(update: Partial): void { this.metadata = { ...this.metadata, ...update, @@ -129,8 +132,8 @@ export class Room { } /** - * Converts the room instance to a plain object - * @returns Plain object representation of the room + * Converts the conversation instance to a plain object + * @returns Plain object representation of the conversation */ public toJSON() { return { diff --git a/packages/core/src/core/db/mongo-db.ts b/packages/core/src/core/db/mongo-db.ts index ffe37a71..056a6901 100644 --- a/packages/core/src/core/db/mongo-db.ts +++ b/packages/core/src/core/db/mongo-db.ts @@ -1,17 +1,20 @@ import { MongoClient, Collection, ObjectId } from "mongodb"; -import type { HandlerRole } from "../types"; import type { + Chat, + ChatMessage, + HandlerRole, OrchestratorChat, - OrchestratorDb, - OrchestratorMessage, ScheduledTask, -} from "../memory"; +} from "../types"; +import type { OrchestratorDb } from "../memory"; export class MongoDb implements OrchestratorDb { private client: MongoClient; private collection!: Collection; private orchestratorCollection!: Collection; + private chatsCollection!: Collection; + /** * @param uri A MongoDB connection string * @param dbName Name of the database to use @@ -36,6 +39,18 @@ export class MongoDb implements OrchestratorDb { const db = this.client.db(this.dbName); this.collection = db.collection(this.collectionName); + this.chatsCollection = db.collection("chats"); + + // Create indexes for efficient querying + await this.chatsCollection.createIndex( + { + userId: 1, + platformId: 1, + threadId: 1, + }, + { unique: true } + ); + // Optional: Create indexes // - An index on nextRunAt helps find "due" tasks quickly // - An index on status helps filter quickly by status @@ -185,18 +200,33 @@ export class MongoDb implements OrchestratorDb { await this.collection.deleteMany({}); } - /** - * Creates a new "orchestrator" document for a user, returning its generated _id. - * This can represent a "new chat/session" with the agent. - */ - public async createOrchestrator(userId: string): Promise { - const chat: OrchestratorChat = { - userId: userId, + public async getOrCreateChat( + userId: string, + platformId: string, + threadId: string, + metadata?: Record + ): Promise { + const existingChat = await this.chatsCollection.findOne({ + userId, + platformId, + threadId, + }); + + if (existingChat) { + return existingChat._id!.toString(); + } + + const chat: Chat = { + userId, + platformId, + threadId, createdAt: new Date(), updatedAt: new Date(), messages: [], + metadata, }; - const result = await this.orchestratorCollection.insertOne(chat); + + const result = await this.chatsCollection.insertOne(chat); return result.insertedId.toString(); } @@ -208,14 +238,14 @@ export class MongoDb implements OrchestratorDb { * @param name - The name/id of the IOHandler. * @param data - The data payload to store (e.g., text, JSON from APIs, etc). */ - public async addMessage( - orchestratorId: string, + public async addChatMessage( + chatId: string, role: HandlerRole, name: string, data: unknown ): Promise { - await this.orchestratorCollection.updateOne( - { _id: orchestratorId }, + await this.chatsCollection.updateOne( + { _id: chatId }, { $push: { messages: { @@ -235,12 +265,11 @@ export class MongoDb implements OrchestratorDb { /** * Retrieves all messages in a specific orchestrator's conversation. */ - public async getMessages( - orchestratorId: string - ): Promise { - const doc = await this.orchestratorCollection.findOne({ - _id: orchestratorId, + public async getChatMessages(chatId: string): Promise { + const doc = await this.chatsCollection.findOne({ + _id: chatId, }); + if (!doc) return []; return doc.messages; } diff --git a/packages/core/src/core/index.ts b/packages/core/src/core/index.ts index c77b2054..80a5ca1b 100644 --- a/packages/core/src/core/index.ts +++ b/packages/core/src/core/index.ts @@ -1,6 +1,6 @@ import { Orchestrator } from "./orchestrator"; -import { RoomManager } from "./room-manager"; -import { Room } from "./room"; +import { ConversationManager } from "./conversation-manager"; +import { Conversation } from "./conversation"; import { ChromaVectorDB } from "./vector-db"; import { BaseProcessor } from "./processor"; import { GoalManager } from "./goal-manager"; @@ -32,8 +32,8 @@ export { Orchestrator, Processors, Providers, - Room, - RoomManager, + Conversation, + ConversationManager, StepManager, Types, Utils, diff --git a/packages/core/src/core/io/discord.ts b/packages/core/src/core/io/discord.ts index c3be1be4..45a92e4e 100644 --- a/packages/core/src/core/io/discord.ts +++ b/packages/core/src/core/io/discord.ts @@ -6,7 +6,12 @@ import { Partials, } from "discord.js"; import { Logger } from "../../core/logger"; -import { HandlerRole, LogLevel, type IOHandler } from "../types"; +import { + HandlerRole, + LogLevel, + type IOHandler, + type ProcessableContent, +} from "../types"; import { env } from "../../core/env"; import { z } from "zod"; @@ -75,7 +80,9 @@ export class DiscordClient { * Optionally start listening to Discord messages. * The onData callback typically feeds data into Orchestrator or similar. */ - public startMessageStream(onData: (data: any) => void) { + public startMessageStream( + onData: (data: ProcessableContent | ProcessableContent[]) => void + ) { this.logger.info("DiscordClient", "Starting message stream..."); // If you want to capture the listener reference for removal: @@ -93,9 +100,13 @@ export class DiscordClient { } onData({ - content: message.content, - channelId: message.channelId, - sentBy: message.author?.id, + userId: message.author?.displayName, + platformId: "discord", + threadId: message.channel.id, + contentId: message.id, + data: { + content: message.content, + }, }); }; @@ -132,7 +143,10 @@ export class DiscordClient { role: HandlerRole.OUTPUT, name: "discord_message", execute: async (data: T) => { - return await this.sendMessage(data as MessageData); + // Cast the result to ProcessableContent to satisfy the IOHandler signature. + return (await this.sendMessage( + data as MessageData + )) as unknown as ProcessableContent; }, outputSchema: messageSchema, }; diff --git a/packages/core/src/core/life-cycle.ts b/packages/core/src/core/life-cycle.ts new file mode 100644 index 00000000..d7503e80 --- /dev/null +++ b/packages/core/src/core/life-cycle.ts @@ -0,0 +1,238 @@ +import { HandlerRole, type Memory } from "./types"; +import type { OrchestratorDb } from "./memory"; +import type { ConversationManager } from "./conversation-manager"; +import type { Conversation } from "./conversation"; + +export function makeFlowLifecycle( + orchestratorDb: OrchestratorDb, + conversationManager: ConversationManager +): FlowLifecycle { + return { + async onFlowStart( + userId: string, + platformId: string, + threadId: string, + initialData: unknown + ): Promise { + return orchestratorDb.getOrCreateChat(userId, platformId, threadId); + }, + + async onFlowStep( + chatId: string | undefined, + role: HandlerRole, + sourceName: string, + data: unknown + ): Promise { + if (!chatId) return; + await orchestratorDb.addChatMessage(chatId, role, sourceName, data); + }, + + async onTasksScheduled( + userId: string, + tasks: { name: string; data: unknown; intervalMs?: number }[] + ): Promise { + for (const task of tasks) { + const now: number = Date.now(); + const nextRunAt: Date = new Date(now + (task.intervalMs ?? 0)); + await orchestratorDb.createTask( + userId, + task.name, + { + request: task.name, + task_data: JSON.stringify(task.data), + }, + nextRunAt, + task.intervalMs + ); + } + }, + + async onContentProcessed( + contentId: string, + threadId: string, + content: string, + metadata?: Record + ): Promise { + const hasProcessed = + await conversationManager.hasProcessedContentInConversation( + contentId, + threadId + ); + if (hasProcessed) { + return; + } + await conversationManager.markContentAsProcessed( + contentId, + threadId + ); + }, + + async onConversationCreated( + userId: string, + threadId: string, + source: string + ): Promise { + return await conversationManager.ensureConversation( + threadId, + source, + userId + ); + }, + + async onConversationUpdated( + contentId: string, + threadId: string, + content: string, + source: string, + updates: Record + ): Promise { + await conversationManager.markContentAsProcessed( + contentId, + threadId + ); + }, + + async onMemoryAdded( + chatId: string, + content: string, + source: string, + updates: Record + ): Promise { + await conversationManager.addMemory(chatId, content, { + source, + ...updates, + }); + + await orchestratorDb.addChatMessage( + chatId, + HandlerRole.INPUT, + source, + { + content, + chatId, + } + ); + }, + + async onMemoriesRequested( + chatId: string, + limit?: number + ): Promise<{ memories: Memory[] }> { + // get vector based memories + // todo, we could base this on a userID so the agent has memories across conversations + const memories = + await conversationManager.getMemoriesFromConversation( + chatId, + limit + ); + + return { memories }; + }, + + async onCheckContentProcessed( + contentId: string, + chatId: string + ): Promise { + return await conversationManager.hasProcessedContentInConversation( + contentId, + chatId + ); + }, + }; +} + +/** + * A set of lifecycle callbacks for orchestrator events. + */ +export interface FlowLifecycle { + /** + * Called when a new flow is started or continued. + * Allows you to create or fetch an Orchestrator record, returning an ID if relevant. + */ + onFlowStart( + userId: string, + platformId: string, + threadId: string, + initialData: unknown + ): Promise; + + /** + * Called when new data is processed in the flow (e.g., an input message). + */ + onFlowStep( + chatId: string, + role: HandlerRole, + sourceName: string, + data: unknown + ): Promise; + + /** + * Called when the Orchestrator wants to schedule tasks (e.g. recurring tasks). + * You can store them in your DB or in a queue system. + */ + onTasksScheduled( + userId: string, + tasks: { + name: string; + data: unknown; + intervalMs?: number; + }[] + ): Promise; + + /** + * Called when content has been processed in a conversation + */ + onContentProcessed( + userId: string, + threadId: string, + content: string, + metadata?: Record + ): Promise; + + /** + * Called when a new conversation is created + */ + onConversationCreated( + userId: string, + threadId: string, + source: string, + metadata?: Record + ): Promise; + + /** + * Called when a conversation is updated + */ + onConversationUpdated( + contentId: string, + threadId: string, + content: string, + source: string, + updates: Record + ): Promise; + + /** + * Called when a new memory needs to be added to a conversation + */ + onMemoryAdded( + chatId: string, + content: string, + source: string, + updates: Record + ): Promise; + + /** + * Called when memories need to be retrieved for a conversation + */ + onMemoriesRequested( + chatId: string, + limit?: number + ): Promise<{ memories: Memory[] }>; + + /** + * Called to check if specific content has been processed in a conversation + */ + onCheckContentProcessed( + contentId: string, + chatId: string + ): Promise; +} diff --git a/packages/core/src/core/memory.ts b/packages/core/src/core/memory.ts index c5410973..7b1ce65d 100644 --- a/packages/core/src/core/memory.ts +++ b/packages/core/src/core/memory.ts @@ -1,33 +1,11 @@ -import type { HandlerRole, Memory } from "./types"; -import type { Room } from "./room"; - -// Define interfaces matching MongoDB document shapes -export interface ScheduledTask { - _id: string; - userId: string; - handlerName: string; - taskData: Record; - nextRunAt: Date; - intervalMs?: number; - status: "pending" | "running" | "completed" | "failed"; - createdAt: Date; - updatedAt: Date; -} - -export interface OrchestratorMessage { - role: HandlerRole; - name: string; - data: unknown; - timestamp: Date; -} - -export interface OrchestratorChat { - _id?: string; - userId: string; - createdAt: Date; - updatedAt: Date; - messages: OrchestratorMessage[]; -} +import type { + ChatMessage, + HandlerRole, + Memory, + OrchestratorChat, + ScheduledTask, +} from "./types"; +import type { Conversation } from "./conversation"; export interface OrchestratorDb { connect(): Promise; @@ -36,14 +14,19 @@ export interface OrchestratorDb { // Orchestrator methods getOrchestratorById(id: string): Promise; getOrchestratorsByUserId(userId: string): Promise; - createOrchestrator(userId: string): Promise; - addMessage( - orchestratorId: string, + getOrCreateChat( + userId: string, + platformId: string, + threadId: string, + metadata?: Record + ): Promise; + addChatMessage( + chatId: string, role: HandlerRole, name: string, - data: any + data: unknown ): Promise; - getMessages(orchestratorId: string): Promise; + getChatMessages(chatId: string): Promise; // Task management methods createTask( @@ -62,12 +45,23 @@ export interface OrchestratorDb { } export interface MemoryManager { - hasProcessedContentInRoom( + hasProcessedContentInConversation( contentId: string, - roomId: string + conversationId: string ): Promise; - ensureRoom(roomId: string, source: string, userId?: string): Promise; - getMemoriesFromRoom(roomId: string): Promise; - addMemory(roomId: string, content: string, metadata?: any): Promise; - markContentAsProcessed(contentId: string, roomId: string): Promise; + ensureConversation( + conversationId: string, + source: string, + userId?: string + ): Promise; + getMemoriesFromConversation(conversationId: string): Promise; + addMemory( + conversationId: string, + content: string, + metadata?: any + ): Promise; + markContentAsProcessed( + contentId: string, + conversationId: string + ): Promise; } diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index bdb55099..146323f3 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -1,20 +1,13 @@ import { Logger } from "./logger"; -import { RoomManager } from "./room-manager"; import type { BaseProcessor } from "./processor"; -import type { AgentRequest, Memory, ProcessedResult, VectorDB } from "./types"; +import type { Memory, ProcessableContent, ProcessedResult } from "./types"; import { HandlerRole, LogLevel, type LoggerConfig } from "./types"; import type { IOHandler } from "./types"; +import type { FlowLifecycle } from "./life-cycle"; -import type { OrchestratorDb } from "./memory"; - -/** - * Orchestrator system that manages both "input" and "output" handlers - * in a unified manner, along with scheduling recurring inputs. - */ export class Orchestrator { /** - * Unified collection of IOHandlers (both input & output). - * Keyed by .name + * Unified collection of IOHandlers (both input & output), keyed by name. */ private readonly ioHandlers = new Map(); @@ -24,30 +17,15 @@ export class Orchestrator { private readonly logger: Logger; /** - * orchestratorDb instance for database operations. - */ - private readonly orchestratorDb: OrchestratorDb; - - /** - * Map of unsubscribe functions for various handlers. - * Keyed by handler name. + * Map of unsubscribe functions for various handlers, keyed by handler name. */ private unsubscribers = new Map void>(); - /** - * Other references in your system. Adjust as needed. - */ - public readonly vectorDb: VectorDB; - constructor( - private readonly roomManager: RoomManager, - vectorDb: VectorDB, private processor: BaseProcessor, - orchestratorDb: OrchestratorDb, + private readonly flowHooks: FlowLifecycle, config?: LoggerConfig ) { - this.vectorDb = vectorDb; - this.orchestratorDb = orchestratorDb; this.logger = new Logger( config ?? { level: LogLevel.ERROR, @@ -67,9 +45,8 @@ export class Orchestrator { } /** - * Primary method to register any IOHandler (input or output). - * - If it's an input with an interval, schedule it for recurring runs. - * - Otherwise, just store it in the ioHandlers map. + * Registers an IOHandler (input or output). For input handlers with a subscribe method, + * registers the subscription and stores its unsubscriber. */ public registerIOHandler(handler: IOHandler): void { if (this.ioHandlers.has(handler.name)) { @@ -89,11 +66,7 @@ export class Orchestrator { "Starting stream", { data } ); - // Simulate a request-like object here if you want a consistent approach. - // this will register as an agent request - const fakeRequest: AgentRequest = { headers: {} }; - // Whenever data arrives, pass it into runAutonomousFlow - await this.runAutonomousFlow(fakeRequest, data, handler.name); + await this.run(data, handler.name); }); this.unsubscribers.set(handler.name, unsubscribe); } @@ -106,17 +79,15 @@ export class Orchestrator { } /** - * Removes a handler (input or output) by name, stopping scheduling if needed. + * Removes a handler (input or output) by name and stops its scheduling if needed. */ public removeIOHandler(name: string): void { - // If we have an unsubscribe function, call it const unsub = this.unsubscribers.get(name); if (unsub) { - unsub(); // e.g. remove event listeners, clear intervals, etc. + unsub(); // E.g. remove event listeners, clear intervals, etc. this.unsubscribers.delete(name); } - // Remove the handler itself this.ioHandlers.delete(name); this.logger.info("Orchestrator.removeIOHandler", "Removed IOHandler", { @@ -125,75 +96,64 @@ export class Orchestrator { } /** - * Dispatches data to a registered *output* handler by name, passing in a request plus data. + * Dispatches data to a registered *output* handler by name. */ public async dispatchToOutput( name: string, - request: AgentRequest, - data: T + data: ProcessableContent ): Promise { const handler = this.ioHandlers.get(name); if (!handler || !handler.execute) { throw new Error(`No IOHandler registered with name: ${name}`); } - - if (handler.role !== "output") { + if (handler.role !== HandlerRole.OUTPUT) { throw new Error(`Handler "${name}" is not an output handler`); } this.logger.debug("Orchestrator.dispatchToOutput", "Executing output", { name, data, - headers: request.headers, }); try { const result = await handler.execute(data); - this.logger.info("Orchestrator.dispatchToOutput", "Output result", { result, }); - return result; } catch (error) { this.logger.error( "Orchestrator.dispatchToOutput", "Handler threw an error", - { - name, - error, - } + { name, error } ); throw error; } } /** - * Dispatches data to a registered *action* handler by name, passing in a request plus data. + * Dispatches data to a registered *action* handler by name. */ public async dispatchToAction( name: string, - request: AgentRequest, - data: T + data: ProcessableContent ): Promise { const handler = this.ioHandlers.get(name); if (!handler || !handler.execute) { throw new Error(`No IOHandler registered with name: ${name}`); } - if (handler.role !== "action") { + if (handler.role !== HandlerRole.ACTION) { throw new Error(`Handler "${name}" is not an action handler`); } try { const result = await handler.execute(data); - this.logger.debug( "Orchestrator.dispatchToAction", "Executing action", { name, data, - headers: request.headers, } ); return result; @@ -201,45 +161,34 @@ export class Orchestrator { this.logger.error( "Orchestrator.dispatchToAction", "Handler threw an error", - { - name, - error, - } + { name, error } ); throw error; } } /** - * Dispatches data to a registered *input* handler by name, passing in a request plus data. - * Then continues through the autonomous flow. + * Dispatches data to a registered *input* handler by name, then continues through the autonomous flow. */ public async dispatchToInput( name: string, - request: AgentRequest, - data: T, - orchestratorId?: string + data: ProcessableContent ): Promise { const handler = this.ioHandlers.get(name); - if (!handler) throw new Error(`No IOHandler: ${name}`); + if (!handler) { + throw new Error(`No IOHandler: ${name}`); + } if (!handler.execute) { throw new Error(`Handler "${name}" has no execute method`); } - if (handler.role !== "input") { + if (handler.role !== HandlerRole.INPUT) { throw new Error(`Handler "${name}" is not role=input`); } try { - // Possibly run a transformation or normalizing step inside `handler.execute` const result = await handler.execute(data); - if (result) { - return await this.runAutonomousFlow( - request, - result, - handler.name, - orchestratorId - ); + return await this.run(result, handler.name); } return []; } catch (error) { @@ -254,255 +203,168 @@ export class Orchestrator { } /** - * Takes some incoming piece of data, processes it through the system, - * and handles any follow-on "action" or "output" suggestions in a queue. + * Main processing loop: feeds incoming data into the processing queue and + * dispatches any suggested outputs or actions. * - * @param request A request-like object (headers, etc.) from which we extract user info - * @param initialData The data payload to process - * @param sourceName The IOHandler name that provided this data - * @param orchestratorId An optional existing orchestrator record ID to tie into + * @param data The initial data or array of data to process. + * @param sourceName The name of the IOHandler that provided this data. */ - private async runAutonomousFlow( - request: AgentRequest, - initialData: unknown, - sourceName: string, - orchestratorId?: string - ) { - // For illustration, extract userId from headers. Adjust the header name as needed. - const userId = request.headers["x-user-id"] || "agent"; + private async run( + data: ProcessableContent | ProcessableContent[], + sourceName: string + ): Promise> { + // Initialize the processing queue + const queue: Array<{ data: ProcessableContent; source: string }> = + Array.isArray(data) + ? data.map((item) => ({ data: item, source: sourceName })) + : [{ data, source: sourceName }]; + + const collectedOutputs: Array<{ name: string; data: any }> = []; - const queue: Array<{ data: unknown; source: string }> = []; - - // If the initial data is already an array, enqueue each item - if (Array.isArray(initialData)) { - for (const item of initialData) { - queue.push({ data: item, source: sourceName }); - } - } else { - queue.push({ data: initialData, source: sourceName }); + while (queue.length > 0) { + const currentItem = queue.shift()!; + const outputs = await this.processQueueItem(currentItem, queue); + collectedOutputs.push(...outputs); } - // Optionally store final outputs to return or do something with them + return collectedOutputs; + } + + /** + * Processes one queue item: + * - Starts a conversation flow. + * - Processes the content. + * - Dispatches any suggested outputs or actions. + * + * @param item The queue item containing the data and its source. + * @param queue The current processing queue (to which new items may be added). + */ + private async processQueueItem( + item: { data: ProcessableContent; source: string }, + queue: Array<{ data: ProcessableContent; source: string }> + ): Promise> { + const { data, source } = item; const outputs: Array<{ name: string; data: any }> = []; - // If orchestratorId is provided, verify it in the DB or create a new record - if (orchestratorId) { - const existing = - await this.orchestratorDb.getOrchestratorById(orchestratorId); - if (!existing) { - orchestratorId = - await this.orchestratorDb.createOrchestrator(userId); - } - } + // Start the conversation/flow. + const chatId = await this.flowHooks.onFlowStart( + data.userId, + data.platformId, + data.threadId, + data.data + ); - // Otherwise, create a new orchestrator record if needed - if (!orchestratorId) { - orchestratorId = - await this.orchestratorDb.createOrchestrator(userId); - } + await this.flowHooks.onFlowStep( + chatId, + HandlerRole.INPUT, + source, + data + ); - // Record initial data as an input message - if (orchestratorId) { - await this.orchestratorDb.addMessage( - orchestratorId, - HandlerRole.INPUT, - sourceName, - initialData - ); - this.logger.debug( - "Orchestrator.runAutonomousFlow", - "Created or continued orchestrator record", - { - orchestratorId, - userId, - } - ); + // Process the content. + const processedResults = await this.processContent(data, source); + if (!processedResults?.length) { + return outputs; } - // Process items in a queue - while (queue.length > 0) { - const { data, source } = queue.shift()!; - - // Record each chunk of data if you want - if (orchestratorId) { - await this.orchestratorDb.addMessage( - orchestratorId, - HandlerRole.INPUT, - source, - data - ); - - this.logger.debug( - "Orchestrator.runAutonomousFlow", - "Added message to orchestrator record", - { - orchestratorId, - message: { - role: HandlerRole.INPUT, - name: source, - data, - }, - } - ); - } - - // processContent can return an array of ProcessedResult - const processedResults = await this.processContent( - data, - source, - userId - ); - - if (!processedResults || processedResults.length === 0) { + // Handle each processed result. + for (const result of processedResults) { + if (result.alreadyProcessed) { continue; } - for (const processed of processedResults) { - // If the item was already processed, skip - if (processed.alreadyProcessed) continue; - - // Possibly schedule any tasks in the DB - if (processed.updateTasks) { - for (const task of processed.updateTasks) { - const now = Date.now(); - const nextRunAt = new Date( - now + (task.intervalMs ?? 0) - ); - this.logger.info( - "Orchestrator.runAutonomousFlow", - `Scheduling task ${task.name}`, - { nextRunAt, intervalMs: task.intervalMs } - ); + // Schedule any tasks if present. + if (result.updateTasks?.length) { + await this.flowHooks.onTasksScheduled( + data.userId, + result.updateTasks.map((task) => ({ + name: task.name, + data: task.data, + intervalMs: task.intervalMs, + })) + ); + } - await this.orchestratorDb.createTask( - userId, - task.name, - { - request: task.name, - task_data: JSON.stringify(task.data), - }, - nextRunAt, - task.intervalMs - ); - } + // Process any suggested outputs or actions. + for (const suggestion of result.suggestedOutputs ?? []) { + const handler = this.ioHandlers.get(suggestion.name); + if (!handler) { + this.logger.warn( + "Orchestrator.processQueueItem", + `No handler found for suggested output: ${suggestion.name}` + ); + continue; } - // For each suggested output or action - for (const output of processed.suggestedOutputs ?? []) { - const handler = this.ioHandlers.get(output.name); - if (!handler) { - this.logger.warn( - "Orchestrator.runAutonomousFlow", - `No handler found for suggested output: ${output.name}` - ); - continue; - } - - if (handler.role === HandlerRole.OUTPUT) { - // e.g. send a Slack message - outputs.push({ name: output.name, data: output.data }); + switch (handler.role) { + case HandlerRole.OUTPUT: + outputs.push({ + name: suggestion.name, + data: suggestion.data, + }); await this.dispatchToOutput( - output.name, - request, - output.data + suggestion.name, + suggestion.data ); - - this.logger.debug( - "Orchestrator.runAutonomousFlow", - "Dispatched output", - { - name: output.name, - data: output.data, - } + await this.flowHooks.onFlowStep( + chatId, + HandlerRole.OUTPUT, + suggestion.name, + suggestion.data ); + break; - if (orchestratorId) { - await this.orchestratorDb.addMessage( - orchestratorId, - HandlerRole.OUTPUT, - output.name, - output.data - ); - } - } else if (handler.role === HandlerRole.ACTION) { - // e.g. fetch data from an external API + case HandlerRole.ACTION: { const actionResult = await this.dispatchToAction( - output.name, - request, - output.data + suggestion.name, + suggestion.data ); - - this.logger.debug( - "Orchestrator.runAutonomousFlow", - "Dispatched action", - { - name: output.name, - data: output.data, - } + await this.flowHooks.onFlowStep( + chatId, + HandlerRole.ACTION, + suggestion.name, + { input: suggestion.data, result: actionResult } ); - - if (orchestratorId) { - await this.orchestratorDb.addMessage( - orchestratorId, - HandlerRole.ACTION, - output.name, - { - input: output.data, - result: actionResult, - } - ); - } - - // If the action returns new data, queue it up if (actionResult) { - if (Array.isArray(actionResult)) { - for (const item of actionResult) { - queue.push({ - data: item, - source: output.name, - }); - } - } else { + const newItems = Array.isArray(actionResult) + ? actionResult + : [actionResult]; + for (const newItem of newItems) { queue.push({ - data: actionResult, - source: output.name, + data: newItem, + source: suggestion.name, }); } } - } else { + break; + } + + default: this.logger.warn( - "Orchestrator.runAutonomousFlow", + "Orchestrator.processQueueItem", "Suggested output has an unrecognized role", handler.role ); - } } } } - // Return the final outputs array, or handle them in your own way return outputs; } /** - * Processes *any* content by splitting it into items (if needed) and - * calling the single-item processor. + * Processes content by handling both single items and arrays. + * A small delay is introduced for each item (if processing an array). */ public async processContent( - content: any, - source: string, - userId?: string + content: ProcessableContent | ProcessableContent[], + source: string ): Promise { if (Array.isArray(content)) { const allResults: ProcessedResult[] = []; for (const item of content) { - // Example delay to show chunk processing, remove if not needed - await new Promise((resolve) => setTimeout(resolve, 5000)); - const result = await this.processContentItem( - item, - source, - userId - ); + await this.delay(5000); // Example delay; remove if not needed. + const result = await this.processContentItem(item, source); if (result) { allResults.push(result); } @@ -510,53 +372,32 @@ export class Orchestrator { return allResults; } - const singleResult = await this.processContentItem( - content, - source, - userId - ); + const singleResult = await this.processContentItem(content, source); return singleResult ? [singleResult] : []; } /** - * Processes a single item of content: - * - Retrieves prior memories from its room - * - Lets the "master" processor handle it - * - Optionally saves the result to memory/marks it processed + * Processes a single content item: + * - Retrieves conversation context and prior memories. + * - Passes the item to the processor. + * - Updates conversation and memory. */ private async processContentItem( - content: any, - source: string, - userId?: string + content: ProcessableContent, + source: string ): Promise { - let memories: Memory[] = []; - - // If the content indicates a "room" property - if (content.room) { - const hasProcessed = - await this.roomManager.hasProcessedContentInRoom( - content.contentId, - content.room - ); - if (hasProcessed) { - this.logger.debug( - "Orchestrator.processContentItem", - "Content already processed", - { - contentId: content.contentId, - roomId: content.room, - userId, - } - ); - return null; - } + let memories: { memories: Memory[] } = { memories: [] }; + + const conversation = await this.flowHooks.onConversationCreated( + content.userId, + content.threadId, + source + ); - const room = await this.roomManager.ensureRoom( - content.room, - source, - userId + if (content.threadId && content.userId) { + memories = await this.flowHooks.onMemoriesRequested( + conversation.id ); - memories = await this.roomManager.getMemoriesFromRoom(room.id); this.logger.debug( "Orchestrator.processContentItem", @@ -564,14 +405,14 @@ export class Orchestrator { { content, source, - roomId: room.id, - userId, + conversationId: conversation.id, + userId: content.userId, relevantMemories: memories, } ); } - // Gather possible outputs & actions to pass to the Processor + // Collect available outputs and actions. const availableOutputs = Array.from(this.ioHandlers.values()).filter( (h) => h.role === HandlerRole.OUTPUT ); @@ -579,7 +420,7 @@ export class Orchestrator { (h) => h.role === HandlerRole.ACTION ); - // Processor's main entry point + // Process the content. const result = await this.processor.process( content, JSON.stringify(memories), @@ -589,23 +430,44 @@ export class Orchestrator { } ); - // If there's a room, save the memory and mark processed - if (content.room && result) { - await this.roomManager.addMemory( - content.room, - JSON.stringify(result.content), - { - source, - ...result.metadata, - ...result.enrichedContext, - } - ); - await this.roomManager.markContentAsProcessed( - content.contentId, - content.room - ); - } + // Save memory and update the conversation. + await this.flowHooks.onMemoryAdded( + conversation.id, + JSON.stringify(result.content), + source, + { + ...result.metadata, + ...result.enrichedContext, + } + ); + + this.logger.debug( + "Orchestrator.processContentItem", + "Updating conversation", + { + conversationId: conversation.id, + contentId: content.contentId, + threadId: content.threadId, + userId: content.userId, + result, + } + ); + + await this.flowHooks.onConversationUpdated( + content.contentId, + conversation.id, + JSON.stringify(result.content), + source, + result.metadata + ); return result; } + + /** + * A helper method to introduce a delay. + */ + private delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } } diff --git a/packages/core/src/core/processor.ts b/packages/core/src/core/processor.ts index 1e2491b4..70eee6ba 100644 --- a/packages/core/src/core/processor.ts +++ b/packages/core/src/core/processor.ts @@ -2,7 +2,7 @@ import { Logger } from "./logger"; import { LogLevel, type Character, type ProcessedResult } from "./types"; -import type { IOHandler } from "./types"; +import type { IOHandler, ProcessableContent } from "./types"; export abstract class BaseProcessor { /** Logger instance for this processor */ @@ -47,7 +47,7 @@ export abstract class BaseProcessor { * Processes the given content and returns a result. */ public abstract process( - content: any, + content: ProcessableContent, otherContext: string, ioContext?: { availableOutputs?: IOHandler[]; diff --git a/packages/core/src/core/processors/master-processor.ts b/packages/core/src/core/processors/master-processor.ts index 7cf122de..5f7652b9 100644 --- a/packages/core/src/core/processors/master-processor.ts +++ b/packages/core/src/core/processors/master-processor.ts @@ -4,6 +4,7 @@ import type { ActionIOHandler, Character, OutputIOHandler, + ProcessableContent, ProcessedResult, SuggestedOutput, } from "../types"; @@ -46,7 +47,7 @@ export class MasterProcessor extends BaseProcessor { } async process( - content: any, + content: ProcessableContent, otherContext: string, ioContext?: { availableOutputs: OutputIOHandler[]; diff --git a/packages/core/src/core/room-manager.ts b/packages/core/src/core/room-manager.ts deleted file mode 100644 index c820fe12..00000000 --- a/packages/core/src/core/room-manager.ts +++ /dev/null @@ -1,310 +0,0 @@ -import { Room } from "./room"; -import type { Memory, RoomMetadata } from "./types"; -import { ChromaVectorDB } from "./vector-db"; -import { Logger } from "./logger"; -import { LogLevel } from "./types"; - -export class RoomManager { - private logger: Logger; - - constructor( - private vectorDb?: ChromaVectorDB, - config: { - logLevel?: LogLevel; - } = {} - ) { - this.logger = new Logger({ - level: config.logLevel || LogLevel.INFO, - enableColors: true, - enableTimestamp: true, - }); - } - - public async getRoom(roomId: string): Promise { - if (!this.vectorDb) { - this.logger.warn("RoomManager.getRoom", "No VectorDB provided"); - return undefined; - } - - try { - const collection = await this.vectorDb.getCollectionForRoom(roomId); - const metadata = collection.metadata; - - if (!metadata?.platform || !metadata?.platformId) { - this.logger.warn( - "RoomManager.getRoom", - "Room missing required metadata", - { - roomId, - } - ); - return undefined; - } - - return new Room( - metadata.platformId as string, - metadata.platform as string, - { - name: metadata.name as string, - description: metadata.description as string, - participants: metadata.participants as string[], - createdAt: new Date( - metadata.created as string | number | Date - ), - lastActive: new Date( - (metadata.lastActive || metadata.created) as - | string - | number - | Date - ), - } - ); - } catch (error) { - this.logger.error("RoomManager.getRoom", "Failed to get room", { - error: error instanceof Error ? error.message : String(error), - roomId, - }); - return undefined; - } - } - - public async getRoomByPlatformId( - platformId: string, - platform: string - ): Promise { - if (platform === "consciousness") { - platformId = "main"; - } - - const roomId = Room.createDeterministicId(platform, platformId); - return this.getRoom(roomId); - } - - public async createRoom( - platformId: string, - platform: string, - metadata?: Partial - ): Promise { - if (!this.vectorDb) { - throw new Error("VectorDB required for room creation"); - } - - const room = new Room(platformId, platform, metadata); - - try { - const collection = await this.vectorDb.getCollectionForRoom( - room.id - ); - - // Update collection with full room metadata including userId - await collection.modify({ - metadata: { - description: "Room-specific memory storage", - roomId: room.id, - platform: room.platform, - platformId: room.platformId, - created: room.getMetadata().createdAt.toISOString(), - lastActive: room.getMetadata().lastActive.toISOString(), - name: metadata?.name, - participants: metadata?.participants, - userId: metadata?.userId, // Include userId in collection metadata - }, - }); - - this.logger.debug( - "RoomManager.createRoom", - "Room collection created", - { - roomId: room.id, - platform, - platformId, - userId: metadata?.userId, // Log userId - } - ); - - return room; - } catch (error) { - this.logger.error( - "RoomManager.createRoom", - "Failed to create room collection", - { - error: - error instanceof Error ? error.message : String(error), - roomId: room.id, - userId: metadata?.userId, // Log userId in errors - } - ); - throw error; - } - } - - public async addMemory( - roomId: string, - content: string, - metadata?: Record - ): Promise { - if (!this.vectorDb) { - throw new Error("VectorDB required for adding memories"); - } - - const room = await this.getRoom(roomId); - if (!room) { - throw new Error(`Room ${roomId} not found`); - } - - const memory = await room.addMemory(content, metadata); - - // Store in room-specific collection with userId from metadata - await this.vectorDb.storeInRoom(memory.content, room.id, { - memoryId: memory.id, - timestamp: memory.timestamp, - platform: room.platform, - userId: metadata?.userId, // Include userId in vector storage - ...metadata, - }); - - return memory; - } - - public async findSimilarMemoriesInRoom( - content: string, - roomId: string, - limit = 5 - ): Promise { - if (!this.vectorDb) { - throw new Error("VectorDB required for finding memories"); - } - - const results = await this.vectorDb.findSimilarInRoom( - content, - roomId, - limit - ); - - return results.map((result) => ({ - id: result.metadata?.memoryId, - roomId: roomId, - content: result.content, - timestamp: new Date(result.metadata?.timestamp), - metadata: result.metadata, - })); - } - - public async listRooms(): Promise { - if (!this.vectorDb) { - return []; - } - - const roomIds = await this.vectorDb.listRooms(); - const rooms: Room[] = []; - - for (const roomId of roomIds) { - const room = await this.getRoom(roomId); - if (room) { - rooms.push(room); - } - } - - return rooms; - } - - public async ensureRoom( - name: string, - platform: string, - userId?: string - ): Promise { - let room = await this.getRoomByPlatformId(name, platform); - if (!room) { - room = await this.createRoom(name, platform, { - name, - description: `Room for ${name}`, - participants: [], - userId, // Add userId to metadata - }); - } - return room; - } - - public async deleteRoom(roomId: string): Promise { - if (!this.vectorDb) { - return; - } - - await this.vectorDb.deleteRoom(roomId); - this.logger.info("RoomManager.deleteRoom", "Room deleted", { roomId }); - } - - public async getMemoriesFromRoom( - roomId: string, - limit?: number - ): Promise { - if (!this.vectorDb) { - throw new Error("VectorDB required for getting memories"); - } - - const room = await this.getRoom(roomId); - if (!room) { - throw new Error(`Room ${roomId} not found`); - } - - const memories = await this.vectorDb.getMemoriesFromRoom(roomId, limit); - - return memories.map((memory) => ({ - id: memory.metadata?.memoryId, - roomId: roomId, - content: memory.content, - timestamp: new Date(memory.metadata?.timestamp), - metadata: memory.metadata, - })); - } - - public async hasProcessedContentInRoom( - contentId: string, - roomId: string - ): Promise { - if (!this.vectorDb) { - throw new Error("VectorDB required for getting memories"); - } - - const room = await this.getRoom(roomId); - if (!room) { - this.logger.error( - "RoomManager.markContentAsProcessed", - "Room not found", - { - roomId, - } - ); - return false; - } - - return await this.vectorDb.hasProcessedContent(contentId, room); - } - - public async markContentAsProcessed( - contentId: string, - roomId: string - ): Promise { - if (!this.vectorDb) { - throw new Error( - "VectorDB required for marking content as processed" - ); - } - - const room = await this.getRoom(roomId); - - if (!room) { - this.logger.error( - "RoomManager.markContentAsProcessed", - "Room not found", - { - roomId, - } - ); - return false; - } - - await this.vectorDb.markContentAsProcessed(contentId, room); - return true; - } -} diff --git a/packages/core/src/core/schedule-service.ts b/packages/core/src/core/schedule-service.ts index 0f768d85..f150da7f 100644 --- a/packages/core/src/core/schedule-service.ts +++ b/packages/core/src/core/schedule-service.ts @@ -1,13 +1,13 @@ import { Orchestrator } from "./orchestrator"; import { HandlerRole, type VectorDB } from "./types"; import type { Logger } from "./logger"; -import type { RoomManager } from "./room-manager"; +import type { ConversationManager } from "./conversation-manager"; import type { OrchestratorDb } from "./memory"; export interface IOrchestratorContext { logger: Logger; orchestratorDb: OrchestratorDb; - roomManager: RoomManager; + conversationManager: ConversationManager; vectorDb: VectorDB; } @@ -52,33 +52,20 @@ export class SchedulerService { case HandlerRole.INPUT: await this.orchestrator.dispatchToInput( task.handlerName, - { - headers: { - "x-user-id": task.userId, - }, - }, + data ); break; case HandlerRole.ACTION: await this.orchestrator.dispatchToAction( task.handlerName, - { - headers: { - "x-user-id": task.userId, - }, - }, + data ); break; case HandlerRole.OUTPUT: await this.orchestrator.dispatchToOutput( task.handlerName, - { - headers: { - "x-user-id": task.userId, - }, - }, data ); break; diff --git a/packages/core/src/core/types/index.ts b/packages/core/src/core/types/index.ts index 3cec67fd..de95c34e 100644 --- a/packages/core/src/core/types/index.ts +++ b/packages/core/src/core/types/index.ts @@ -328,7 +328,7 @@ export interface Thought { type: string; source: string; metadata?: Record; - roomId?: string; + conversationId?: string; } export type ThoughtType = @@ -345,7 +345,7 @@ export interface ThoughtTemplate { temperature: number; } -export interface RoomMetadata { +export interface ConversationMetadata { name: string; description?: string; participants: string[]; @@ -356,7 +356,7 @@ export interface RoomMetadata { export interface Memory { id: string; - roomId: string; + conversationId: string; content: string; timestamp: Date; metadata?: Record; @@ -374,15 +374,15 @@ export interface VectorDB { delete(id: string): Promise; - storeInRoom( + storeInConversation( content: string, - roomId: string, + conversationId: string, metadata?: Record ): Promise; - findSimilarInRoom( + findSimilarInConversation( content: string, - roomId: string, + conversationId: string, limit?: number, metadata?: Record ): Promise; @@ -552,9 +552,11 @@ export interface InputIOHandler extends BaseIOHandler { /** Identifies this as an input handler */ role: HandlerRole.INPUT; /** Function to process input data */ - execute?: (data: any) => Promise; + execute?: (data: any) => Promise; /** Sets up a subscription to receive streaming data */ - subscribe?: (onData: (data: any) => void) => () => void; + subscribe?: ( + onData: (data: ProcessableContent | ProcessableContent[]) => void + ) => () => void; } /** @@ -580,7 +582,7 @@ export interface OutputIOHandler extends BaseIOHandler { /** Required schema to validate output data */ outputSchema: z.ZodType; /** Function to process and send output */ - execute?: (data: any) => Promise; + execute?: (data: any) => any; /** Sets up a subscription to handle output streams */ subscribe?: (onData: (data: any) => void) => () => void; } @@ -611,6 +613,79 @@ export interface ActionIOHandler extends BaseIOHandler { /** Union type of all possible IO handler types */ export type IOHandler = InputIOHandler | OutputIOHandler | ActionIOHandler; -export interface AgentRequest { - headers: Record; +/** + * Base interface for any content that can be processed + */ +export interface ProcessableContent { + contentId: string; + userId: string; + platformId: string; + threadId: string; + data: unknown; +} + +export interface Chat { + _id?: string; + userId: string; // the user the agent is interacting with could be an agent or a human + platformId: string; // e.g., "twitter", "telegram" + threadId: string; // platform-specific thread/conversation ID + createdAt: Date; + updatedAt: Date; + messages: ChatMessage[]; + metadata?: Record; // Platform-specific data +} + +export interface ChatMessage { + role: HandlerRole; + name: string; + data: unknown; + timestamp: Date; + messageId?: string; // Platform-specific message ID if available +} + +// Define interfaces matching MongoDB document shapes +export interface ScheduledTask { + _id: string; + userId: string; + handlerName: string; + taskData: Record; + nextRunAt: Date; + intervalMs?: number; + status: "pending" | "running" | "completed" | "failed"; + createdAt: Date; + updatedAt: Date; +} + +export interface OrchestratorMessage { + role: HandlerRole; + name: string; + data: unknown; + timestamp: Date; +} + +export interface OrchestratorChat { + _id?: string; + userId: string; + createdAt: Date; + updatedAt: Date; + messages: OrchestratorMessage[]; +} + +export interface Chat { + _id?: string; + userId: string; + platformId: string; // e.g., "twitter", "telegram" + threadId: string; // platform-specific thread/conversation ID + createdAt: Date; + updatedAt: Date; + messages: ChatMessage[]; + metadata?: Record; +} + +export interface ChatMessage { + role: HandlerRole; + name: string; + data: unknown; + timestamp: Date; + messageId?: string; // Platform-specific message ID if available } diff --git a/packages/core/src/core/vector-db.ts b/packages/core/src/core/vector-db.ts index 16f090f0..1146634d 100644 --- a/packages/core/src/core/vector-db.ts +++ b/packages/core/src/core/vector-db.ts @@ -2,7 +2,7 @@ import crypto from "crypto"; import { ChromaClient, IncludeEnum, OpenAIEmbeddingFunction } from "chromadb"; import { env } from "./env"; import { Logger } from "./logger"; -import { Room } from "./room"; +import { Conversation } from "./conversation"; import { LogLevel, type ClusterMetadata, @@ -18,7 +18,6 @@ import { type VectorDB, } from "./types"; import { isValidDateValue } from "./utils"; -import { CharacterTextSplitter } from "@langchain/textsplitters"; export class ChromaVectorDB implements VectorDB { // Static collection names @@ -191,7 +190,10 @@ export class ChromaVectorDB implements VectorDB { try { const collection = await this.getCollection(); // Generate deterministic ID so we don't accidentally store duplicates - const id = Room.createDeterministicMemoryId("global", content); + const id = Conversation.createDeterministicMemoryId( + "global", + content + ); this.logger.debug("ChromaVectorDB.store", "Storing content", { id, @@ -240,22 +242,22 @@ export class ChromaVectorDB implements VectorDB { } } - // ======================= ROOM-SPECIFIC METHODS ======================= + // ======================= conversation-SPECIFIC METHODS ======================= /** - * Returns (and creates if necessary) a separate collection for a given room. - * Rooms are typically namespaced as `room_`. + * Returns (and creates if necessary) a separate collection for a given conversation. + * conversations are typically namespaced as `conversation_`. */ - public async getCollectionForRoom(roomId: string) { - const collectionName = `room_${roomId}`; + public async getCollectionForConversation(conversationId: string) { + const collectionName = `conversation_${conversationId}`; return this.client.getOrCreateCollection({ name: collectionName, embeddingFunction: this.embedder, metadata: { - description: "Room-specific memory storage", - roomId, - platform: roomId.split("_")[0], - platformId: roomId.split("_")[0] + "_platform", // TODO: This is a hack to get the platform ID + description: "Conversation-specific memory storage", + conversationId, + platform: conversationId.split("_")[0], + platformId: conversationId.split("_")[0] + "_platform", // TODO: This is a hack to get the platform ID created: new Date().toISOString(), lastActive: new Date().toISOString(), }, @@ -263,39 +265,51 @@ export class ChromaVectorDB implements VectorDB { } /** - * Stores content in a specific room's memory, also associating it with a cluster ID. + * Stores content in a specific conversation's memory, also associating it with a cluster ID. */ - public async storeInRoom( + public async storeInConversation( content: string, - roomId: string, + conversationId: string, metadata: Record = {} ): Promise { try { // Add detailed logging - this.logger.debug("ChromaVectorDB.storeInRoom", "Storing content", { - content, - contentType: typeof content, - contentLength: content?.length, - roomId, - metadata, - }); + this.logger.debug( + "ChromaVectorDB.storeInConversation", + "Storing content", + { + content, + contentType: typeof content, + contentLength: content?.length, + conversationId, + metadata, + } + ); // Ensure content is a non-empty string if (!content || typeof content !== "string") { throw new Error(`Invalid content: ${typeof content}`); } - const collection = await this.getCollectionForRoom(roomId); - const id = Room.createDeterministicMemoryId(roomId, content); + const collection = + await this.getCollectionForConversation(conversationId); + const id = Conversation.createDeterministicMemoryId( + conversationId, + content + ); const timestamp = new Date(metadata.timestamp || Date.now()); - this.logger.debug("ChromaVectorDB.storeInRoom", "Generated ID", { - id, - roomId, - timestamp: timestamp.toISOString(), - }); + this.logger.debug( + "ChromaVectorDB.storeInConversation", + "Generated ID", + { + id, + conversationId, + timestamp: timestamp.toISOString(), + } + ); - // Update the room's metadata + // Update the conversation's metadata await collection.modify({ metadata: { ...collection.metadata, @@ -310,52 +324,57 @@ export class ChromaVectorDB implements VectorDB { metadatas: [ { ...metadata, - roomId, + conversationId, timestamp: timestamp.toISOString(), }, ], }); this.logger.debug( - "ChromaVectorDB.storeInRoom", + "ChromaVectorDB.storeInConversation", "Successfully stored", { id, - roomId, + conversationId, contentLength: content.length, } ); } catch (error) { - this.logger.error("ChromaVectorDB.storeInRoom", "Storage failed", { - error: error instanceof Error ? error.message : String(error), - content, - contentType: typeof content, - roomId, - }); + this.logger.error( + "ChromaVectorDB.storeInConversation", + "Storage failed", + { + error: + error instanceof Error ? error.message : String(error), + content, + contentType: typeof content, + conversationId, + } + ); throw error; } } /** - * Finds similar items in a given room's collection. If no cluster match, - * falls back to "global" search in that room's collection. + * Finds similar items in a given conversation's collection. If no cluster match, + * falls back to "global" search in that conversation's collection. */ - public async findSimilarInRoom( + public async findSimilarInConversation( content: string, - roomId: string, + conversationId: string, limit = 5, metadata?: Record ): Promise { try { // Add detailed logging this.logger.debug( - "ChromaVectorDB.findSimilarInRoom", + "ChromaVectorDB.findSimilarInconversation", "Input content details", { content, contentType: typeof content, contentLength: content?.length, - roomId, + conversationId, metadata, } ); @@ -363,25 +382,26 @@ export class ChromaVectorDB implements VectorDB { // Ensure content is a non-empty string if (!content || typeof content !== "string") { this.logger.warn( - "ChromaVectorDB.findSimilarInRoom", + "ChromaVectorDB.findSimilarInconversation", "Invalid content", { content, contentType: typeof content, - roomId, + conversationId, } ); return []; } - const collection = await this.getCollectionForRoom(roomId); + const collection = + await this.getCollectionForConversation(conversationId); this.logger.debug( - "ChromaVectorDB.findSimilarInRoom", + "ChromaVectorDB.findSimilarInConversation", "Querying collection", { queryText: content, - roomId, + conversationId, limit, metadata, } @@ -405,14 +425,14 @@ export class ChromaVectorDB implements VectorDB { })); } catch (error) { this.logger.error( - "ChromaVectorDB.findSimilarInRoom", + "ChromaVectorDB.findSimilarInconversation", "Search failed", { error: error instanceof Error ? error.message : String(error), content, contentType: typeof content, - roomId, + conversationId, } ); return []; @@ -420,15 +440,16 @@ export class ChromaVectorDB implements VectorDB { } /** - * Fallback search for a room: no cluster restriction, just raw similarity. + * Fallback search for a conversation: no cluster restriction, just raw similarity. */ - private async findSimilarInRoomGlobal( + private async findSimilarInConversationGlobal( content: string, - roomId: string, + conversationId: string, limit = 5, metadata?: Record ): Promise { - const collection = await this.getCollectionForRoom(roomId); + const collection = + await this.getCollectionForConversation(conversationId); const results = await collection.query({ queryTexts: [content], nResults: limit, @@ -462,41 +483,55 @@ export class ChromaVectorDB implements VectorDB { } /** - * Lists the known "room_..." collections. + * Lists the known "conversation_..." collections. */ - public async listRooms(): Promise { + public async listConversations(): Promise { const collections = await this.client.listCollections(); // If .listCollections() returns an array of objects that contain `name`, // you may need to adapt the .filter / .map return collections .map((c) => c) - .filter((name: string) => name.startsWith("room_")) - .map((name: string) => name.replace("room_", "")); + .filter((name: string) => name.startsWith("conversation_")) + .map((name: string) => name.replace("conversation_", "")); } /** - * Gets the memory count for a specific room. + * Gets the memory count for a specific conversation. */ - public async getRoomMemoryCount(roomId: string): Promise { - const collection = await this.getCollectionForRoom(roomId); + public async getConversationMemoryCount( + conversationId: string + ): Promise { + const collection = + await this.getCollectionForConversation(conversationId); return collection.count(); } /** - * Deletes an entire room's collection. + * Deletes an entire conversation's collection. */ - public async deleteRoom(roomId: string): Promise { + public async deleteConversation(conversationId: string): Promise { try { - await this.client.deleteCollection({ name: `room_${roomId}` }); - this.logger.info("ChromaVectorDB.deleteRoom", "Room deleted", { - roomId, + await this.client.deleteCollection({ + name: `conversation_${conversationId}`, }); + this.logger.info( + "ChromaVectorDB.deleteConversation", + "Conversation deleted", + { + conversationId, + } + ); } catch (error) { - this.logger.error("ChromaVectorDB.deleteRoom", "Deletion failed", { - error: error instanceof Error ? error.message : String(error), - roomId, - }); + this.logger.error( + "ChromaVectorDB.deleteConversation", + "Deletion failed", + { + error: + error instanceof Error ? error.message : String(error), + conversationId, + } + ); throw error; } } @@ -1593,10 +1628,12 @@ export class ChromaVectorDB implements VectorDB { // Check if we've already processed this content public async hasProcessedContent( contentId: string, - room: Room + conversation: Conversation ): Promise { try { - const collection = await this.getCollectionForRoom(room.id); + const collection = await this.getCollectionForConversation( + conversation.id + ); // Search for exact match of the content ID in metadata const results = await collection.get({ @@ -1617,7 +1654,7 @@ export class ChromaVectorDB implements VectorDB { error: error instanceof Error ? error.message : String(error), contentId, - roomId: room.id, + conversationId: conversation.id, } ); return false; @@ -1627,10 +1664,12 @@ export class ChromaVectorDB implements VectorDB { // Mark content as processed public async markContentAsProcessed( contentId: string, - room: Room + conversation: Conversation ): Promise { try { - const collection = await this.getCollectionForRoom(room.id); + const collection = await this.getCollectionForConversation( + conversation.id + ); const markerId = `processed_${contentId}`; await collection.add({ @@ -1650,7 +1689,7 @@ export class ChromaVectorDB implements VectorDB { "Marked content as processed", { contentId, - roomId: room.id, + conversationId: conversation.id, markerId, } ); @@ -1662,7 +1701,7 @@ export class ChromaVectorDB implements VectorDB { error: error instanceof Error ? error.message : String(error), contentId, - roomId: room.id, + conversationId: conversation.id, } ); throw error; @@ -1670,14 +1709,15 @@ export class ChromaVectorDB implements VectorDB { } /** - * Gets all memories from a specific room's collection, optionally limited to a certain number + * Gets all memories from a specific conversation's collection, optionally limited to a certain number */ - public async getMemoriesFromRoom( - roomId: string, + public async getMemoriesFromConversation( + conversationId: string, limit?: number ): Promise<{ content: string; metadata?: Record }[]> { try { - const collection = await this.getCollectionForRoom(roomId); + const collection = + await this.getCollectionForConversation(conversationId); // Get all documents from the collection, with optional limit const results = await collection.get({ @@ -1695,12 +1735,12 @@ export class ChromaVectorDB implements VectorDB { })); } catch (error) { this.logger.error( - "ChromaVectorDB.getMemoriesFromRoom", + "ChromaVectorDB.getMemoriesFromConversation", "Failed to get memories", { error: error instanceof Error ? error.message : String(error), - roomId, + conversationId, } ); throw error;