From a38607415704364072c6c9d61744dbcc479a1a12 Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Fri, 31 Jan 2025 11:02:26 +1100 Subject: [PATCH 01/11] implements lifecycle hooks --- examples/example-api.ts | 2 +- examples/example-basic.ts | 8 +- examples/example-discord.ts | 20 +- examples/example-goal.ts | 16 +- examples/example-server.ts | 26 +- examples/example-twitter.ts | 72 ++-- .../core/src/core/__tests__/vector-db.test.ts | 14 +- packages/core/src/core/consciousness.ts | 29 +- .../core/src/core/conversation-manager.ts | 342 ++++++++++++++++++ .../src/core/{room.ts => conversation.ts} | 65 ++-- packages/core/src/core/index.ts | 8 +- packages/core/src/core/life-cycle.ts | 295 +++++++++++++++ packages/core/src/core/memory.ts | 25 +- packages/core/src/core/orchestrator.ts | 308 +++++++--------- packages/core/src/core/room-manager.ts | 310 ---------------- packages/core/src/core/schedule-service.ts | 4 +- packages/core/src/core/types/index.ts | 27 +- packages/core/src/core/vector-db.ts | 209 ++++++----- 18 files changed, 1075 insertions(+), 705 deletions(-) create mode 100644 packages/core/src/core/conversation-manager.ts rename packages/core/src/core/{room.ts => conversation.ts} (59%) create mode 100644 packages/core/src/core/life-cycle.ts delete mode 100644 packages/core/src/core/room-manager.ts diff --git a/examples/example-api.ts b/examples/example-api.ts index cb9d0de9..9e80ac2f 100644 --- a/examples/example-api.ts +++ b/examples/example-api.ts @@ -9,7 +9,7 @@ import { Orchestrator } from "../packages/core/src/core/orchestrator"; import { HandlerRole } from "../packages/core/src/core/types"; -import { RoomManager } from "../packages/core/src/core/room-manager"; +import { RoomManager } from "../packages/core/src/core/conversation-manager"; import { ChromaVectorDB } from "../packages/core/src/core/vector-db"; import { MessageProcessor } from "../packages/core/src/core/processors/message-processor"; import { ResearchQuantProcessor } from "../packages/core/src/core/processors/research-processor"; diff --git a/examples/example-basic.ts b/examples/example-basic.ts index 9731044d..fa87e74b 100644 --- a/examples/example-basic.ts +++ b/examples/example-basic.ts @@ -88,7 +88,9 @@ async function main() { role: HandlerRole.OUTPUT, execute: async (data: any) => { const result = await starknetChain.write(data.payload); - return `Transaction: ${JSON.stringify(result, null, 2)}`; + return { + content: `Transaction: ${JSON.stringify(result, null, 2)}`, + }; }, outputSchema: z .object({ @@ -123,7 +125,9 @@ async function main() { `query: ${query}`, `result: ${JSON.stringify(result, null, 2)}`, ].join("\n\n"); - return `GraphQL data fetched successfully: ${resultStr}`; + return { + content: `GraphQL data fetched successfully: ${resultStr}`, + }; }, outputSchema: z .object({ diff --git a/examples/example-discord.ts b/examples/example-discord.ts index 6007382d..d85bef0b 100644 --- a/examples/example-discord.ts +++ b/examples/example-discord.ts @@ -7,7 +7,7 @@ import { Orchestrator } from "../packages/core/src/core/orchestrator"; import { HandlerRole, LogLevel } from "../packages/core/src/core/types"; import { DiscordClient } from "../packages/core/src/core/io/discord"; -import { RoomManager } from "../packages/core/src/core/room-manager"; +import { ConversationManager } from "../packages/core/src/core/conversation-manager"; import { ChromaVectorDB } from "../packages/core/src/core/vector-db"; import { MessageProcessor } from "../packages/core/src/core/processors/message-processor"; import { LLMClient } from "../packages/core/src/core/llm-client"; @@ -19,6 +19,7 @@ import readline from "readline"; import { MongoDb } from "../packages/core/src/core/db/mongo-db"; import { Message } from "discord.js"; import { MasterProcessor } from "../packages/core/src/core/processors/master-processor"; +import { makeFlowLifecycle } from "../packages/core/src/core/life-cycle"; async function main() { // Set logging level as you see fit @@ -33,10 +34,10 @@ async function main() { // Optional: Purge previous session data if you want a fresh start await vectorDb.purge(); - const roomManager = new RoomManager(vectorDb); + const conversationManager = new ConversationManager(vectorDb); const llmClient = new LLMClient({ - model: "anthropic/claude-3-5-sonnet-latest", // Example model + model: "anthropic/claude-3-5-sonnet-latest", temperature: 0.3, }); @@ -46,15 +47,10 @@ async function main() { loglevel ); - // Initialize processor with default character personality - const messageProcessor = new MessageProcessor( - llmClient, - defaultCharacter, - loglevel + masterProcessor.addProcessor( + new MessageProcessor(llmClient, defaultCharacter, loglevel) ); - masterProcessor.addProcessor(messageProcessor); - // Connect to MongoDB (for scheduled tasks, if you use them) const scheduledTaskDb = new MongoDb( "mongodb://localhost:27017", @@ -69,10 +65,8 @@ async function main() { // Create the Orchestrator const core = new Orchestrator( - roomManager, - vectorDb, masterProcessor, - scheduledTaskDb, + makeFlowLifecycle(scheduledTaskDb, conversationManager), { level: loglevel, enableColors: true, diff --git a/examples/example-goal.ts b/examples/example-goal.ts index 5c5a80ad..e9bc4e9b 100644 --- a/examples/example-goal.ts +++ b/examples/example-goal.ts @@ -106,11 +106,13 @@ async function main() { role: HandlerRole.OUTPUT, execute: async (data: any) => { const result = await starknetChain.write(data.payload); - return `Transaction executed successfully: ${JSON.stringify( - result, - null, - 2 - )}`; + return { + content: `Transaction executed successfully: ${JSON.stringify( + result, + null, + 2 + )}`, + }; }, outputSchema: z .object({ @@ -145,7 +147,9 @@ async function main() { `query: ${query}`, `result: ${JSON.stringify(result, null, 2)}`, ].join("\n\n"); - return `GraphQL data fetched successfully: ${resultStr}`; + return { + content: `GraphQL data fetched successfully: ${resultStr}`, + }; }, outputSchema: z .object({ diff --git a/examples/example-server.ts b/examples/example-server.ts index 02eed6ed..5a74836d 100644 --- a/examples/example-server.ts +++ b/examples/example-server.ts @@ -12,24 +12,25 @@ import { ChromaVectorDB } from "../packages/core/src/core/vector-db"; import { Orchestrator } from "../packages/core/src/core/orchestrator"; import { HandlerRole } from "../packages/core/src/core/types"; -import { RoomManager } from "../packages/core/src/core/room-manager"; +import { ConversationManager } from "../packages/core/src/core/conversation-manager"; import { MessageProcessor } from "../packages/core/src/core/processors/message-processor"; import { defaultCharacter } from "../packages/core/src/core/character"; import { LogLevel } from "../packages/core/src/core/types"; import { MongoDb } from "../packages/core/src/core/db/mongo-db"; import { MasterProcessor } from "../packages/core/src/core/processors/master-processor"; +import { makeFlowLifecycle } from "../packages/core/src/core/life-cycle"; -const scheduledTaskDb = new MongoDb( +const kvDb = new MongoDb( "mongodb://localhost:27017", "myApp", "scheduled_tasks" ); -await scheduledTaskDb.connect(); +await kvDb.connect(); console.log(chalk.green("✅ Scheduled task database connected")); -await scheduledTaskDb.deleteAll(); +await kvDb.deleteAll(); // ------------------------------------------------------ // 1) CREATE DAYDREAMS AGENT @@ -50,7 +51,7 @@ async function createDaydreamsAgent() { }); // 1.3. Room manager initialization - const roomManager = new RoomManager(vectorDb); + const conversationManager = new ConversationManager(vectorDb); const masterProcessor = new MasterProcessor( llmClient, @@ -69,10 +70,8 @@ async function createDaydreamsAgent() { // 1.5. Initialize core system const orchestrator = new Orchestrator( - roomManager, - vectorDb, masterProcessor, - scheduledTaskDb, + makeFlowLifecycle(kvDb, conversationManager), { level: loglevel, enableColors: true, @@ -102,6 +101,10 @@ async function createDaydreamsAgent() { message: string; }; console.log(`Reply to user ${userId ?? "??"}: ${message}`); + return { + userId, + message, + }; }, }); @@ -158,7 +161,7 @@ wss.on("connection", (ws) => { "x-user-id": userId, }, }, - userMessage, + { content: userMessage }, orchestratorId ? orchestratorId : undefined ); @@ -211,8 +214,7 @@ app.get("/api/history/:userId", async (req, res) => { console.log("Fetching history for userId:", userId); // Get all orchestrator records for this user - const histories = - await scheduledTaskDb.getOrchestratorsByUserId(userId); + const histories = await kvDb.getOrchestratorsByUserId(userId); if (!histories || histories.length === 0) { console.log("No histories found"); @@ -241,7 +243,7 @@ app.get("/api/history/:userId/:chatId", async (req, res) => { return res.status(400).json({ error: "Invalid chat ID format" }); } - const history = await scheduledTaskDb.getOrchestratorById(objectId); + const history = await kvDb.getOrchestratorById(objectId); if (!history) { return res.status(404).json({ error: "History not found" }); diff --git a/examples/example-twitter.ts b/examples/example-twitter.ts index 7c7eae2e..097104bc 100644 --- a/examples/example-twitter.ts +++ b/examples/example-twitter.ts @@ -10,7 +10,7 @@ import { Orchestrator } from "../packages/core/src/core/orchestrator"; import { HandlerRole } from "../packages/core/src/core/types"; import { TwitterClient } from "../packages/core/src/core/io/twitter"; -import { RoomManager } from "../packages/core/src/core/room-manager"; +import { ConversationManager } from "../packages/core/src/core/conversation-manager"; import { ChromaVectorDB } from "../packages/core/src/core/vector-db"; import { MessageProcessor } from "../packages/core/src/core/processors/message-processor"; import { LLMClient } from "../packages/core/src/core/llm-client"; @@ -25,56 +25,59 @@ import { MongoDb } from "../packages/core/src/core/db/mongo-db"; import { SchedulerService } from "../packages/core/src/core/schedule-service"; import { MasterProcessor } from "../packages/core/src/core/processors/master-processor"; import { Logger } from "../packages/core/src/core/logger"; +import { makeFlowLifecycle } from "../packages/core/src/core/life-cycle"; async function main() { const loglevel = LogLevel.DEBUG; + // Initialize core dependencies const vectorDb = new ChromaVectorDB("twitter_agent", { chromaUrl: "http://localhost:8000", logLevel: loglevel, }); - await vectorDb.purge(); // Clear previous session data + // Clear previous session data + await vectorDb.purge(); - const roomManager = new RoomManager(vectorDb); + // Initialize room manager + const conversationManager = new ConversationManager(vectorDb); + // Initialize LLM client const llmClient = new LLMClient({ - model: "openrouter:deepseek/deepseek-r1-distill-llama-70b", + model: "anthropic/claude-3-5-sonnet-latest", temperature: 0.3, }); + // Initialize master processor const masterProcessor = new MasterProcessor( llmClient, defaultCharacter, loglevel ); - // Initialize processor with default character personality - const messageProcessor = new MessageProcessor( - llmClient, - defaultCharacter, - loglevel - ); - - masterProcessor.addProcessor(messageProcessor); + // Add message processor to master processor + masterProcessor.addProcessor([ + new MessageProcessor(llmClient, defaultCharacter, loglevel), + ]); - const scheduledTaskDb = new MongoDb( + // Initialize MongoDB for scheduled tasks + const kvDb = new MongoDb( "mongodb://localhost:27017", "myApp", "scheduled_tasks" ); - await scheduledTaskDb.connect(); + // Connect to MongoDB + await kvDb.connect(); console.log(chalk.green("✅ Scheduled task database connected")); - await scheduledTaskDb.deleteAll(); + // Delete previous data for testing + await kvDb.deleteAll(); // Initialize core system const orchestrator = new Orchestrator( - roomManager, - vectorDb, masterProcessor, - scheduledTaskDb, + makeFlowLifecycle(kvDb, conversationManager), { level: loglevel, enableColors: true, @@ -82,6 +85,7 @@ async function main() { } ); + // Initialize scheduler service const scheduler = new SchedulerService( { logger: new Logger({ @@ -89,14 +93,15 @@ async function main() { enableColors: true, enableTimestamp: true, }), - orchestratorDb: scheduledTaskDb, - roomManager: roomManager, + orchestratorDb: kvDb, + conversationManager: conversationManager, vectorDb: vectorDb, }, orchestrator, 10000 ); + // Start scheduler service scheduler.start(); // Set up Twitter client with credentials @@ -110,13 +115,13 @@ async function main() { ); // Initialize autonomous thought generation - const consciousness = new Consciousness(llmClient, roomManager, { + const consciousness = new Consciousness(llmClient, conversationManager, { intervalMs: 300000, // Think every 5 minutes minConfidence: 0.7, logLevel: loglevel, }); - // Register input handler for Twitter mentions + // Register input handler for Twitter mentions orchestrator.registerIOHandler({ name: "twitter_mentions", role: HandlerRole.INPUT, @@ -126,14 +131,15 @@ async function main() { const mentionsInput = twitter.createMentionsInput(60000); const mentions = await mentionsInput.handler(); - // If no new mentions, return null to skip processing - if (!mentions || mentions.length === 0) { - return null; + // If no new mentions, return an empty array to skip processing + if (!mentions || !mentions.length) { + return []; } + // Map mentions to the required format return mentions.map((mention) => ({ type: "tweet", - room: mention.metadata.conversationId, + conversationId: mention.metadata.conversationId, contentId: mention.metadata.tweetId, user: mention.metadata.username, content: mention.content, @@ -152,7 +158,7 @@ async function main() { // If no thought was generated or it was already processed, skip if (!thought || !thought.content) { - return null; + return []; } return thought; @@ -166,6 +172,7 @@ async function main() { execute: async (data: unknown) => { const thoughtData = data as { content: string }; + // Post thought to Twitter return twitter.createTweetOutput().handler({ content: thoughtData.content, }); @@ -184,14 +191,16 @@ async function main() { ), }); - // Schedule a task to run every minute - await scheduler.scheduleTaskInDb("sleever", "twitter_mentions", {}, 6000); // Check mentions every minute + // Schedule a task to check mentions every minute + await scheduler.scheduleTaskInDb("sleever", "twitter_mentions", {}, 60000); + + // Schedule a task to generate thoughts every 5 minutes await scheduler.scheduleTaskInDb( "sleever", "consciousness_thoughts", {}, - 30000 - ); // Think every 5 minutes + 300000 + ); // Register output handler for Twitter replies orchestrator.registerIOHandler({ @@ -200,6 +209,7 @@ async function main() { execute: async (data: unknown) => { const tweetData = data as { content: string; inReplyTo: string }; + // Post reply to Twitter return twitter.createTweetOutput().handler(tweetData); }, outputSchema: z diff --git a/packages/core/src/core/__tests__/vector-db.test.ts b/packages/core/src/core/__tests__/vector-db.test.ts index d04c04ef..efe75b5b 100644 --- a/packages/core/src/core/__tests__/vector-db.test.ts +++ b/packages/core/src/core/__tests__/vector-db.test.ts @@ -143,17 +143,17 @@ describe("ChromaVectorDB", () => { }); }); - // Room Operations - describe("Room Operations", () => { - test("listRooms() should return room collections", async () => { + // Conversation Operations + describe("Conversation Operations", () => { + test("listConversations() should return conversation collections", async () => { mockClient.listCollections.mockResolvedValueOnce([ - "room_123", - "room_456", + "conversation_123", + "conversation_456", "other_collection", ]); - const rooms = await db.listRooms(); - expect(rooms).toEqual(["123", "456"]); + const conversations = await db.listConversations(); + expect(conversations).toEqual(["123", "456"]); }); }); diff --git a/packages/core/src/core/consciousness.ts b/packages/core/src/core/consciousness.ts index 2a3bb475..0f3d4eb5 100644 --- a/packages/core/src/core/consciousness.ts +++ b/packages/core/src/core/consciousness.ts @@ -1,20 +1,20 @@ import { Logger } from "./logger"; import { LLMClient } from "./llm-client"; -import { type Room } from "./room"; -import { type RoomManager } from "./room-manager"; +import { Conversation } from "./conversation"; +import { ConversationManager } from "./conversation-manager"; import { LogLevel, type Thought } from "./types"; import { validateLLMResponseSchema } from "./utils"; import { z } from "zod"; export class Consciousness { - private static readonly ROOM_ID = "consciousness_main"; + private static readonly CONVERSATION_ID = "consciousness_main"; private logger: Logger; private thoughtInterval: NodeJS.Timer | null = null; constructor( private llmClient: LLMClient, - private roomManager: RoomManager, + private conversationManager: ConversationManager, private config: { intervalMs?: number; minConfidence?: number; @@ -59,7 +59,7 @@ export class Consciousness { confidence: thought.confidence, suggestedActions: thought.context?.suggestedActions || [], - roomId: Consciousness.ROOM_ID, + conversationId: Consciousness.CONVERSATION_ID, }, }; } else { @@ -110,7 +110,7 @@ export class Consciousness { private async generateThought(): Promise { const recentMemories = this.getRecentMemories( - await this.roomManager.listRooms() + await this.conversationManager.listConversations() ); const prompt = `Analyze these recent memories and generate an insightful thought. @@ -229,21 +229,21 @@ export class Consciousness { } private getRecentMemories( - rooms: Room[], + conversations: Conversation[], limit: number = 10 - ): Array<{ content: string; roomId: string }> { + ): Array<{ content: string; conversationId: string }> { const allMemories: Array<{ content: string; - roomId: string; + conversationId: string; timestamp: Date; }> = []; - for (const room of rooms) { - const memories = room.getMemories(5); // Get last 5 memories from each room + for (const conversation of conversations) { + const memories = conversation.getMemories(5); // Get last 5 memories from each conversation allMemories.push( ...memories.map((m) => ({ content: m.content, - roomId: room.id, + conversationId: conversation.id, timestamp: m.timestamp, })) ); @@ -253,6 +253,9 @@ export class Consciousness { return allMemories .sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime()) .slice(0, limit) - .map(({ content, roomId }) => ({ content, roomId })); + .map(({ content, conversationId }) => ({ + content, + conversationId, + })); } } diff --git a/packages/core/src/core/conversation-manager.ts b/packages/core/src/core/conversation-manager.ts new file mode 100644 index 00000000..d687c633 --- /dev/null +++ b/packages/core/src/core/conversation-manager.ts @@ -0,0 +1,342 @@ +import { Conversation } from "./conversation"; +import type { Memory, ConversationMetadata } from "./types"; +import { ChromaVectorDB } from "./vector-db"; +import { Logger } from "./logger"; +import { LogLevel } from "./types"; + +export class ConversationManager { + private logger: Logger; + + constructor( + private vectorDb?: ChromaVectorDB, + config: { + logLevel?: LogLevel; + } = {} + ) { + this.logger = new Logger({ + level: config.logLevel || LogLevel.INFO, + enableColors: true, + enableTimestamp: true, + }); + } + + public async getConversation( + conversationId: string + ): Promise { + if (!this.vectorDb) { + this.logger.warn( + "ConversationManager.getConversation", + "No VectorDB provided" + ); + return undefined; + } + + try { + const collection = + await this.vectorDb.getCollectionForConversation( + conversationId + ); + const metadata = collection.metadata; + + if (!metadata?.platform || !metadata?.platformId) { + this.logger.warn( + "ConversationManager.getConversation", + "Conversation missing required metadata", + { + conversationId, + } + ); + return undefined; + } + + return new Conversation( + metadata.platformId as string, + metadata.platform as string, + { + name: metadata.name as string, + description: metadata.description as string, + participants: metadata.participants as string[], + createdAt: new Date( + metadata.created as string | number | Date + ), + lastActive: new Date( + (metadata.lastActive || metadata.created) as + | string + | number + | Date + ), + } + ); + } catch (error) { + this.logger.error( + "ConversationManager.getConversation", + "Failed to get conversation", + { + error: + error instanceof Error ? error.message : String(error), + conversationId, + } + ); + return undefined; + } + } + + public async getConversationByPlatformId( + platformId: string, + platform: string + ): Promise { + if (platform === "consciousness") { + platformId = "main"; + } + + const conversationId = Conversation.createDeterministicId( + platform, + platformId + ); + return this.getConversation(conversationId); + } + + public async createConversation( + platformId: string, + platform: string, + metadata?: Partial + ): Promise { + if (!this.vectorDb) { + throw new Error("VectorDB required for conversation creation"); + } + + const conversation = new Conversation(platformId, platform, metadata); + + try { + const collection = await this.vectorDb.getCollectionForConversation( + conversation.id + ); + + // Update collection with full conversation metadata including userId + await collection.modify({ + metadata: { + description: "Conversation-specific memory storage", + conversationId: conversation.id, + platform: conversation.platform, + platformId: conversation.platformId, + created: conversation.getMetadata().createdAt.toISOString(), + lastActive: conversation + .getMetadata() + .lastActive.toISOString(), + name: metadata?.name, + participants: metadata?.participants, + userId: metadata?.userId, // Include userId in collection metadata + }, + }); + + this.logger.debug( + "ConversationManager.createConversation", + "Conversation collection created", + { + conversationId: conversation.id, + platform, + platformId, + userId: metadata?.userId, // Log userId + } + ); + + return conversation; + } catch (error) { + this.logger.error( + "ConversationManager.createConversation", + "Failed to create conversation collection", + { + error: + error instanceof Error ? error.message : String(error), + conversationId: conversation.id, + userId: metadata?.userId, // Log userId in errors + } + ); + throw error; + } + } + + public async addMemory( + conversationId: string, + content: string, + metadata?: Record + ): Promise { + if (!this.vectorDb) { + throw new Error("VectorDB required for adding memories"); + } + + const conversation = await this.getConversation(conversationId); + if (!conversation) { + throw new Error(`Conversation ${conversationId} not found`); + } + + const memory = await conversation.addMemory(content, metadata); + + // Store in conversation-specific collection with userId from metadata + await this.vectorDb.storeInConversation( + memory.content, + conversation.id, + { + memoryId: memory.id, + timestamp: memory.timestamp, + platform: conversation.platform, + userId: metadata?.userId, // Include userId in vector storage + ...metadata, + } + ); + + return memory; + } + + public async findSimilarMemoriesInConversation( + content: string, + conversationId: string, + limit = 5 + ): Promise { + if (!this.vectorDb) { + throw new Error("VectorDB required for finding memories"); + } + + const results = await this.vectorDb.findSimilarInConversation( + content, + conversationId, + limit + ); + + return results.map((result) => ({ + id: result.metadata?.memoryId, + conversationId: conversationId, + content: result.content, + timestamp: new Date(result.metadata?.timestamp), + metadata: result.metadata, + })); + } + + public async listConversations(): Promise { + if (!this.vectorDb) { + return []; + } + + const conversationIds = await this.vectorDb.listConversations(); + const conversations: Conversation[] = []; + + for (const conversationId of conversationIds) { + const conversation = await this.getConversation(conversationId); + if (conversation) { + conversations.push(conversation); + } + } + + return conversations; + } + + public async ensureConversation( + name: string, + platform: string, + userId?: string + ): Promise { + let conversation = await this.getConversationByPlatformId( + name, + platform + ); + if (!conversation) { + conversation = await this.createConversation(name, platform, { + name, + description: `Conversation for ${name}`, + participants: [], + userId, // Add userId to metadata + }); + } + return conversation; + } + + public async deleteConversation(conversationId: string): Promise { + if (!this.vectorDb) { + return; + } + + await this.vectorDb.deleteConversation(conversationId); + this.logger.info( + "ConversationManager.deleteConversation", + "Conversation deleted", + { conversationId } + ); + } + + public async getMemoriesFromConversation( + conversationId: string, + limit?: number + ): Promise { + if (!this.vectorDb) { + throw new Error("VectorDB required for getting memories"); + } + + const conversation = await this.getConversation(conversationId); + if (!conversation) { + throw new Error(`Conversation ${conversationId} not found`); + } + + const memories = await this.vectorDb.getMemoriesFromConversation( + conversationId, + limit + ); + + return memories.map((memory) => ({ + id: memory.metadata?.memoryId, + conversationId: conversationId, + content: memory.content, + timestamp: new Date(memory.metadata?.timestamp), + metadata: memory.metadata, + })); + } + + public async hasProcessedContentInConversation( + contentId: string, + conversationId: string + ): Promise { + if (!this.vectorDb) { + throw new Error("VectorDB required for getting memories"); + } + + const conversation = await this.getConversation(conversationId); + if (!conversation) { + this.logger.error( + "ConversationManager.markContentAsProcessed", + "Conversation not found", + { + conversationId, + } + ); + return false; + } + + return await this.vectorDb.hasProcessedContent(contentId, conversation); + } + + public async markContentAsProcessed( + contentId: string, + conversationId: string + ): Promise { + if (!this.vectorDb) { + throw new Error( + "VectorDB required for marking content as processed" + ); + } + + const conversation = await this.getConversation(conversationId); + + if (!conversation) { + this.logger.error( + "ConversationManager.markContentAsProcessed", + "Conversation not found", + { + conversationId, + } + ); + return false; + } + + await this.vectorDb.markContentAsProcessed(contentId, conversation); + return true; + } +} diff --git a/packages/core/src/core/room.ts b/packages/core/src/core/conversation.ts similarity index 59% rename from packages/core/src/core/room.ts rename to packages/core/src/core/conversation.ts index 8279d1a5..be78864a 100644 --- a/packages/core/src/core/room.ts +++ b/packages/core/src/core/conversation.ts @@ -1,33 +1,33 @@ import { createHash } from "crypto"; -import type { RoomMetadata } from "./types"; +import type { ConversationMetadata } from "./types"; import type { Memory } from "./types"; /** - * Represents a room/conversation context that can store memories and metadata. + * Represents a conversation context that can store memories and metadata. */ -export class Room { - /** Unique identifier for the room */ +export class Conversation { + /** Unique identifier for the conversation */ public readonly id: string; - /** Collection of memories associated with this room */ + /** Collection of memories associated with this conversation */ private memories: Memory[] = []; - /** Metadata about the room like name, description, participants etc */ - private metadata: RoomMetadata; + /** Metadata about the conversation like name, description, participants etc */ + private metadata: ConversationMetadata; /** - * Creates a new Room instance + * Creates a new Conversation instance * @param platformId - Platform-specific identifier (e.g. tweet thread ID, chat ID) - * @param platform - Platform name where this room exists - * @param metadata - Optional metadata to initialize the room with + * @param platform - Platform name where this conversation exists + * @param metadata - Optional metadata to initialize the conversation with */ constructor( public readonly platformId: string, public readonly platform: string, - metadata?: Partial + metadata?: Partial ) { - this.id = Room.createDeterministicId(platform, platformId); + this.id = Conversation.createDeterministicId(platform, platformId); this.metadata = { - name: metadata?.name || `Room ${platformId}`, + name: metadata?.name || `Conversation ${platformId}`, description: metadata?.description, participants: metadata?.participants || [], createdAt: metadata?.createdAt || new Date(), @@ -37,10 +37,10 @@ export class Room { } /** - * Creates a deterministic room ID based on platform and platformId + * Creates a deterministic conversation ID based on platform and platformId * @param platform - Platform name * @param platformId - Platform-specific identifier - * @returns A deterministic room ID string + * @returns A deterministic conversation ID string */ public static createDeterministicId( platform: string, @@ -55,7 +55,7 @@ export class Room { } /** - * Adds a new memory to the room + * Adds a new memory to the conversation * @param content - Content of the memory * @param metadata - Optional metadata for the memory * @returns The created Memory object @@ -64,12 +64,15 @@ export class Room { content: string, metadata?: Record ): Promise { - // Create deterministic memory ID based on room ID and content - const memoryId = Room.createDeterministicMemoryId(this.id, content); + // Create deterministic memory ID based on Conversation ID and content + const memoryId = Conversation.createDeterministicMemoryId( + this.id, + content + ); const memory: Memory = { id: memoryId, - roomId: this.id, + conversationId: this.id, content, timestamp: new Date(), metadata, @@ -82,17 +85,17 @@ export class Room { } /** - * Creates a deterministic memory ID based on room ID and content - * @param roomId - ID of the room + * Creates a deterministic memory ID based on conversation ID and content + * @param conversationId - ID of the conversation * @param content - Content of the memory * @returns A deterministic memory ID string */ public static createDeterministicMemoryId( - roomId: string, + conversationId: string, content: string ): string { const hash = createHash("sha256") - .update(`${roomId}:${content}`) + .update(`${conversationId}:${content}`) .digest("hex") .slice(0, 16); @@ -100,7 +103,7 @@ export class Room { } /** - * Retrieves memories from the room + * Retrieves memories from the conversation * @param limit - Optional limit on number of memories to return * @returns Array of Memory objects */ @@ -109,18 +112,18 @@ export class Room { } /** - * Gets a copy of the room's metadata - * @returns Copy of room metadata + * Gets a copy of the conversation's metadata + * @returns Copy of conversation metadata */ - public getMetadata(): RoomMetadata { + public getMetadata(): ConversationMetadata { return { ...this.metadata }; } /** - * Updates the room's metadata + * Updates the conversation's metadata * @param update - Partial metadata object with fields to update */ - public updateMetadata(update: Partial): void { + public updateMetadata(update: Partial): void { this.metadata = { ...this.metadata, ...update, @@ -129,8 +132,8 @@ export class Room { } /** - * Converts the room instance to a plain object - * @returns Plain object representation of the room + * Converts the conversation instance to a plain object + * @returns Plain object representation of the conversation */ public toJSON() { return { diff --git a/packages/core/src/core/index.ts b/packages/core/src/core/index.ts index c77b2054..80a5ca1b 100644 --- a/packages/core/src/core/index.ts +++ b/packages/core/src/core/index.ts @@ -1,6 +1,6 @@ import { Orchestrator } from "./orchestrator"; -import { RoomManager } from "./room-manager"; -import { Room } from "./room"; +import { ConversationManager } from "./conversation-manager"; +import { Conversation } from "./conversation"; import { ChromaVectorDB } from "./vector-db"; import { BaseProcessor } from "./processor"; import { GoalManager } from "./goal-manager"; @@ -32,8 +32,8 @@ export { Orchestrator, Processors, Providers, - Room, - RoomManager, + Conversation, + ConversationManager, StepManager, Types, Utils, diff --git a/packages/core/src/core/life-cycle.ts b/packages/core/src/core/life-cycle.ts new file mode 100644 index 00000000..edd14ff6 --- /dev/null +++ b/packages/core/src/core/life-cycle.ts @@ -0,0 +1,295 @@ +// MyFlowLifecycle.ts + +import { HandlerRole, type Memory } from "./types"; +// Suppose we have an OrchestratorDb or some DB client: +import type { OrchestratorDb, OrchestratorMessage } from "./memory"; +import type { ConversationManager } from "./conversation-manager"; +import type { Conversation } from "./conversation"; + +export function makeFlowLifecycle( + orchestratorDb: OrchestratorDb, + conversationManager: ConversationManager +): FlowLifecycle { + return { + async onFlowStart( + userId: string, + sourceName: string, + initialData: unknown + ): Promise { + return await orchestratorDb.createOrchestrator(userId); + }, + + async onFlowStep( + orchestratorId: string | undefined, + userId: string, + role: HandlerRole, + sourceName: string, + data: unknown + ): Promise { + if (!orchestratorId) return; + await orchestratorDb.addMessage( + orchestratorId, + role, + sourceName, + data + ); + }, + + async onTasksScheduled( + userId: string, + tasks: { name: string; data: unknown; intervalMs?: number }[] + ): Promise { + for (const task of tasks) { + const now: number = Date.now(); + const nextRunAt: Date = new Date(now + (task.intervalMs ?? 0)); + await orchestratorDb.createTask( + userId, + task.name, + { + request: task.name, + task_data: JSON.stringify(task.data), + }, + nextRunAt, + task.intervalMs + ); + } + }, + + async onOutputDispatched( + orchestratorId: string | undefined, + userId: string, + outputName: string, + outputData: unknown + ): Promise { + if (!orchestratorId) return; + await orchestratorDb.addMessage( + orchestratorId, + HandlerRole.OUTPUT, + outputName, + outputData + ); + }, + + async onActionDispatched( + orchestratorId: string | undefined, + userId: string, + actionName: string, + inputData: unknown, + result: unknown + ): Promise { + if (!orchestratorId) return; + await orchestratorDb.addMessage( + orchestratorId, + HandlerRole.ACTION, + actionName, + { + input: inputData, + result, + } + ); + }, + + async onContentProcessed( + contentId: string, + conversationId: string, + content: string, + metadata?: Record + ): Promise { + const hasProcessed = + await conversationManager.hasProcessedContentInConversation( + contentId, + conversationId + ); + if (hasProcessed) { + return; + } + await conversationManager.markContentAsProcessed( + contentId, + conversationId + ); + }, + + async onConversationCreated( + userId: string, + conversationId: string, + source: string + ): Promise { + return await conversationManager.ensureConversation( + conversationId, + source, + userId + ); + }, + + async onConversationUpdated( + contentId: string, + conversationId: string, + content: string, + source: string, + updates: Record + ): Promise { + await conversationManager.markContentAsProcessed( + contentId, + conversationId + ); + }, + + async onMemoryAdded( + conversationId: string, + content: string, + source: string, + updates: Record + ): Promise { + await conversationManager.addMemory(conversationId, content, { + source, + ...updates, + }); + }, + + async onMemoriesRequested( + conversationId: string, + limit?: number + ): Promise<{ memories: Memory[]; chatHistory: OrchestratorMessage[] }> { + // get vector based memories + // todo, we could base this on a userID so the agent has memories across conversations + const memories = + await conversationManager.getMemoriesFromConversation( + conversationId, + limit + ); + // TODO: get history from db + const chatHistory = + await orchestratorDb.getMessages(conversationId); + + return { memories, chatHistory }; + }, + + async onCheckContentProcessed( + contentId: string, + conversationId: string + ): Promise { + return await conversationManager.hasProcessedContentInConversation( + contentId, + conversationId + ); + }, + }; +} + +/** + * A set of optional lifecycle callbacks for orchestrator events. + * The Orchestrator will call these methods if they are defined. + */ +export interface FlowLifecycle { + /** + * Called when a new flow is started or continued. + * Allows you to create or fetch an Orchestrator record, returning an ID if relevant. + */ + onFlowStart?( + userId: string, + sourceName: string, + initialData: unknown + ): Promise; + + /** + * Called when new data is processed in the flow (e.g., an input message). + */ + onFlowStep?( + orchestratorId: string | undefined, + userId: string, + role: HandlerRole, + sourceName: string, + data: unknown + ): Promise; + + /** + * Called when the Orchestrator wants to schedule tasks (e.g. recurring tasks). + * You can store them in your DB or in a queue system. + */ + onTasksScheduled?( + userId: string, + tasks: { + name: string; + data: unknown; + intervalMs?: number; + }[] + ): Promise; + + /** + * Called after an output is dispatched (e.g. store it or log it). + */ + onOutputDispatched?( + orchestratorId: string | undefined, + userId: string, + outputName: string, + outputData: unknown + ): Promise; + + /** + * Called after an action is dispatched (e.g. store it or log it). + */ + onActionDispatched?( + orchestratorId: string | undefined, + userId: string, + actionName: string, + inputData: unknown, + result: unknown + ): Promise; + + /** + * Called when content has been processed in a conversation + */ + onContentProcessed?( + userId: string, + conversationId: string, + content: string, + metadata?: Record + ): Promise; + + /** + * Called when a new conversation is created + */ + onConversationCreated( + userId: string, + conversationId: string, + source: string, + metadata?: Record + ): Promise; + + /** + * Called when a conversation is updated + */ + onConversationUpdated?( + contentId: string, + conversationId: string, + content: string, + source: string, + updates: Record + ): Promise; + + /** + * Called when a new memory needs to be added to a conversation + */ + onMemoryAdded?( + conversationId: string, + content: string, + source: string, + updates: Record + ): Promise; + + /** + * Called when memories need to be retrieved for a conversation + */ + onMemoriesRequested?( + conversationId: string, + limit?: number + ): Promise<{ memories: Memory[]; chatHistory: OrchestratorMessage[] }>; + + /** + * Called to check if specific content has been processed in a conversation + */ + onCheckContentProcessed?( + contentId: string, + conversationId: string + ): Promise; +} diff --git a/packages/core/src/core/memory.ts b/packages/core/src/core/memory.ts index c5410973..05b0b3ad 100644 --- a/packages/core/src/core/memory.ts +++ b/packages/core/src/core/memory.ts @@ -1,5 +1,5 @@ import type { HandlerRole, Memory } from "./types"; -import type { Room } from "./room"; +import type { Conversation } from "./conversation"; // Define interfaces matching MongoDB document shapes export interface ScheduledTask { @@ -62,12 +62,23 @@ export interface OrchestratorDb { } export interface MemoryManager { - hasProcessedContentInRoom( + hasProcessedContentInConversation( contentId: string, - roomId: string + conversationId: string ): Promise; - ensureRoom(roomId: string, source: string, userId?: string): Promise; - getMemoriesFromRoom(roomId: string): Promise; - addMemory(roomId: string, content: string, metadata?: any): Promise; - markContentAsProcessed(contentId: string, roomId: string): Promise; + ensureConversation( + conversationId: string, + source: string, + userId?: string + ): Promise; + getMemoriesFromConversation(conversationId: string): Promise; + addMemory( + conversationId: string, + content: string, + metadata?: any + ): Promise; + markContentAsProcessed( + contentId: string, + conversationId: string + ): Promise; } diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index bdb55099..a515aa55 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -1,16 +1,17 @@ import { Logger } from "./logger"; -import { RoomManager } from "./room-manager"; import type { BaseProcessor } from "./processor"; -import type { AgentRequest, Memory, ProcessedResult, VectorDB } from "./types"; +import type { + AgentRequest, + Memory, + ProcessableContent, + ProcessedResult, +} from "./types"; import { HandlerRole, LogLevel, type LoggerConfig } from "./types"; import type { IOHandler } from "./types"; -import type { OrchestratorDb } from "./memory"; +import type { FlowLifecycle } from "./life-cycle"; +import type { OrchestratorMessage } from "./memory"; -/** - * Orchestrator system that manages both "input" and "output" handlers - * in a unified manner, along with scheduling recurring inputs. - */ export class Orchestrator { /** * Unified collection of IOHandlers (both input & output). @@ -23,31 +24,17 @@ export class Orchestrator { */ private readonly logger: Logger; - /** - * orchestratorDb instance for database operations. - */ - private readonly orchestratorDb: OrchestratorDb; - /** * Map of unsubscribe functions for various handlers. * Keyed by handler name. */ private unsubscribers = new Map void>(); - /** - * Other references in your system. Adjust as needed. - */ - public readonly vectorDb: VectorDB; - constructor( - private readonly roomManager: RoomManager, - vectorDb: VectorDB, private processor: BaseProcessor, - orchestratorDb: OrchestratorDb, + private readonly flowLifecycle: FlowLifecycle, config?: LoggerConfig ) { - this.vectorDb = vectorDb; - this.orchestratorDb = orchestratorDb; this.logger = new Logger( config ?? { level: LogLevel.ERROR, @@ -82,7 +69,7 @@ export class Orchestrator { this.ioHandlers.set(handler.name, handler); - if (handler.role === HandlerRole.INPUT && handler.subscribe) { + if (handler.role === "input" && handler.subscribe) { const unsubscribe = handler.subscribe(async (data) => { this.logger.info( "Orchestrator.registerIOHandler", @@ -90,7 +77,6 @@ export class Orchestrator { { data } ); // Simulate a request-like object here if you want a consistent approach. - // this will register as an agent request const fakeRequest: AgentRequest = { headers: {} }; // Whenever data arrives, pass it into runAutonomousFlow await this.runAutonomousFlow(fakeRequest, data, handler.name); @@ -130,7 +116,7 @@ export class Orchestrator { public async dispatchToOutput( name: string, request: AgentRequest, - data: T + data: ProcessableContent ): Promise { const handler = this.ioHandlers.get(name); if (!handler || !handler.execute) { @@ -174,7 +160,7 @@ export class Orchestrator { public async dispatchToAction( name: string, request: AgentRequest, - data: T + data: ProcessableContent ): Promise { const handler = this.ioHandlers.get(name); if (!handler || !handler.execute) { @@ -217,7 +203,7 @@ export class Orchestrator { public async dispatchToInput( name: string, request: AgentRequest, - data: T, + data: ProcessableContent, orchestratorId?: string ): Promise { const handler = this.ioHandlers.get(name); @@ -273,7 +259,6 @@ export class Orchestrator { const queue: Array<{ data: unknown; source: string }> = []; - // If the initial data is already an array, enqueue each item if (Array.isArray(initialData)) { for (const item of initialData) { queue.push({ data: item, source: sourceName }); @@ -282,112 +267,65 @@ export class Orchestrator { queue.push({ data: initialData, source: sourceName }); } - // Optionally store final outputs to return or do something with them + // Optionally store final outputs to return const outputs: Array<{ name: string; data: any }> = []; - // If orchestratorId is provided, verify it in the DB or create a new record - if (orchestratorId) { - const existing = - await this.orchestratorDb.getOrchestratorById(orchestratorId); - if (!existing) { - orchestratorId = - await this.orchestratorDb.createOrchestrator(userId); - } - } - - // Otherwise, create a new orchestrator record if needed - if (!orchestratorId) { - orchestratorId = - await this.orchestratorDb.createOrchestrator(userId); - } - - // Record initial data as an input message - if (orchestratorId) { - await this.orchestratorDb.addMessage( - orchestratorId, - HandlerRole.INPUT, + // 1. Fire the "flow start" hook (could create or fetch an orchestrator record). + if (this.flowLifecycle?.onFlowStart) { + const maybeId = await this.flowLifecycle.onFlowStart( + userId, sourceName, initialData ); - this.logger.debug( - "Orchestrator.runAutonomousFlow", - "Created or continued orchestrator record", - { - orchestratorId, - userId, - } - ); + if (maybeId) { + orchestratorId = maybeId; + } } - // Process items in a queue + // 2. Process items in a queue while (queue.length > 0) { const { data, source } = queue.shift()!; - // Record each chunk of data if you want - if (orchestratorId) { - await this.orchestratorDb.addMessage( + // 2a. Notify the flowStep hook that we have new input + if (this.flowLifecycle?.onFlowStep) { + await this.flowLifecycle.onFlowStep( orchestratorId, + userId, HandlerRole.INPUT, source, data ); - - this.logger.debug( - "Orchestrator.runAutonomousFlow", - "Added message to orchestrator record", - { - orchestratorId, - message: { - role: HandlerRole.INPUT, - name: source, - data, - }, - } - ); } - // processContent can return an array of ProcessedResult + // 2b. The main processing const processedResults = await this.processContent( - data, + data as ProcessableContent | ProcessableContent[], source, userId ); - if (!processedResults || processedResults.length === 0) { continue; } for (const processed of processedResults) { - // If the item was already processed, skip if (processed.alreadyProcessed) continue; - // Possibly schedule any tasks in the DB - if (processed.updateTasks) { - for (const task of processed.updateTasks) { - const now = Date.now(); - const nextRunAt = new Date( - now + (task.intervalMs ?? 0) - ); - this.logger.info( - "Orchestrator.runAutonomousFlow", - `Scheduling task ${task.name}`, - { nextRunAt, intervalMs: task.intervalMs } - ); - - await this.orchestratorDb.createTask( - userId, - task.name, - { - request: task.name, - task_data: JSON.stringify(task.data), - }, - nextRunAt, - task.intervalMs - ); - } + // 2c. If we have tasks to schedule, pass them to the hook + if ( + processed.updateTasks?.length && + this.flowLifecycle?.onTasksScheduled + ) { + await this.flowLifecycle.onTasksScheduled( + userId, + processed.updateTasks.map((task) => ({ + name: task.name, + data: task.data, + intervalMs: task.intervalMs, + })) + ); } - // For each suggested output or action + // 2d. For each suggested output/action, dispatch them for (const output of processed.suggestedOutputs ?? []) { const handler = this.ioHandlers.get(output.name); if (!handler) { @@ -399,7 +337,6 @@ export class Orchestrator { } if (handler.role === HandlerRole.OUTPUT) { - // e.g. send a Slack message outputs.push({ name: output.name, data: output.data }); await this.dispatchToOutput( output.name, @@ -407,49 +344,49 @@ export class Orchestrator { output.data ); - this.logger.debug( - "Orchestrator.runAutonomousFlow", - "Dispatched output", - { - name: output.name, - data: output.data, - } - ); - - if (orchestratorId) { - await this.orchestratorDb.addMessage( + // Notify the flowStep hook that we output something + if (this.flowLifecycle?.onFlowStep) { + await this.flowLifecycle.onFlowStep( orchestratorId, + userId, HandlerRole.OUTPUT, output.name, output.data ); } + // Or specifically call onOutputDispatched if you prefer: + if (this.flowLifecycle?.onOutputDispatched) { + await this.flowLifecycle.onOutputDispatched( + orchestratorId, + userId, + output.name, + output.data + ); + } } else if (handler.role === HandlerRole.ACTION) { - // e.g. fetch data from an external API const actionResult = await this.dispatchToAction( output.name, request, output.data ); - this.logger.debug( - "Orchestrator.runAutonomousFlow", - "Dispatched action", - { - name: output.name, - data: output.data, - } - ); - - if (orchestratorId) { - await this.orchestratorDb.addMessage( + // Notify a flow step or action-dispatched hook + if (this.flowLifecycle?.onFlowStep) { + await this.flowLifecycle.onFlowStep( orchestratorId, + userId, HandlerRole.ACTION, output.name, - { - input: output.data, - result: actionResult, - } + { input: output.data, result: actionResult } + ); + } + if (this.flowLifecycle?.onActionDispatched) { + await this.flowLifecycle.onActionDispatched( + orchestratorId, + userId, + output.name, + output.data, + actionResult ); } @@ -480,7 +417,7 @@ export class Orchestrator { } } - // Return the final outputs array, or handle them in your own way + // 3. Return final outputs, or handle them as you see fit return outputs; } @@ -489,14 +426,14 @@ export class Orchestrator { * calling the single-item processor. */ public async processContent( - content: any, + content: ProcessableContent | ProcessableContent[], source: string, userId?: string ): Promise { if (Array.isArray(content)) { const allResults: ProcessedResult[] = []; for (const item of content) { - // Example delay to show chunk processing, remove if not needed + // Example delay: remove if not needed await new Promise((resolve) => setTimeout(resolve, 5000)); const result = await this.processContentItem( item, @@ -519,59 +456,75 @@ export class Orchestrator { } /** - * Processes a single item of content: - * - Retrieves prior memories from its room - * - Lets the "master" processor handle it - * - Optionally saves the result to memory/marks it processed + * Process a single item: + * - Retrieves prior memories from its conversation + * - Passes it to the main (or "master") processor + * - Saves result to memory & marks processed if relevant */ private async processContentItem( - content: any, + content: ProcessableContent, source: string, userId?: string ): Promise { - let memories: Memory[] = []; - - // If the content indicates a "room" property - if (content.room) { + let memories: { + memories: Memory[]; + chatHistory: OrchestratorMessage[]; + } = { memories: [], chatHistory: [] }; + + if ( + content.conversationId && + content.contentId && + this.flowLifecycle?.onCheckContentProcessed + ) { const hasProcessed = - await this.roomManager.hasProcessedContentInRoom( + await this.flowLifecycle.onCheckContentProcessed( content.contentId, - content.room + content.conversationId ); + if (hasProcessed) { this.logger.debug( "Orchestrator.processContentItem", "Content already processed", { contentId: content.contentId, - roomId: content.room, + conversationId: content.conversationId, userId, } ); return null; } - const room = await this.roomManager.ensureRoom( - content.room, - source, - userId - ); - memories = await this.roomManager.getMemoriesFromRoom(room.id); + if (userId && this.flowLifecycle?.onMemoriesRequested) { + // Ensure the conversation + const conversation = + await this.flowLifecycle.onConversationCreated( + userId, + content.conversationId, + source + ); - this.logger.debug( - "Orchestrator.processContentItem", - "Processing content with context", - { - content, - source, - roomId: room.id, - userId, - relevantMemories: memories, - } - ); + memories = await this.flowLifecycle.onMemoriesRequested( + conversation.id + ); + + this.logger.debug( + "Orchestrator.processContentItem", + "Processing content with context", + { + content, + source, + conversationId: conversation.id, + userId, + relevantMemories: memories, + } + ); + } + + // Grab memories } - // Gather possible outputs & actions to pass to the Processor + // Gather possible outputs & actions const availableOutputs = Array.from(this.ioHandlers.values()).filter( (h) => h.role === HandlerRole.OUTPUT ); @@ -579,7 +532,7 @@ export class Orchestrator { (h) => h.role === HandlerRole.ACTION ); - // Processor's main entry point + // Processor main entry point const result = await this.processor.process( content, JSON.stringify(memories), @@ -589,20 +542,29 @@ export class Orchestrator { } ); - // If there's a room, save the memory and mark processed - if (content.room && result) { - await this.roomManager.addMemory( - content.room, + // If there's a conversationId, store the memory and mark processed + if ( + content.conversationId && + result && + content.contentId && + this.flowLifecycle?.onMemoryAdded && + this.flowLifecycle?.onConversationUpdated + ) { + await this.flowLifecycle.onMemoryAdded( + content.conversationId, JSON.stringify(result.content), + source, { - source, ...result.metadata, ...result.enrichedContext, } ); - await this.roomManager.markContentAsProcessed( + await this.flowLifecycle.onConversationUpdated( content.contentId, - content.room + content.conversationId, + JSON.stringify(result.content), + source, + result.metadata ); } diff --git a/packages/core/src/core/room-manager.ts b/packages/core/src/core/room-manager.ts deleted file mode 100644 index c820fe12..00000000 --- a/packages/core/src/core/room-manager.ts +++ /dev/null @@ -1,310 +0,0 @@ -import { Room } from "./room"; -import type { Memory, RoomMetadata } from "./types"; -import { ChromaVectorDB } from "./vector-db"; -import { Logger } from "./logger"; -import { LogLevel } from "./types"; - -export class RoomManager { - private logger: Logger; - - constructor( - private vectorDb?: ChromaVectorDB, - config: { - logLevel?: LogLevel; - } = {} - ) { - this.logger = new Logger({ - level: config.logLevel || LogLevel.INFO, - enableColors: true, - enableTimestamp: true, - }); - } - - public async getRoom(roomId: string): Promise { - if (!this.vectorDb) { - this.logger.warn("RoomManager.getRoom", "No VectorDB provided"); - return undefined; - } - - try { - const collection = await this.vectorDb.getCollectionForRoom(roomId); - const metadata = collection.metadata; - - if (!metadata?.platform || !metadata?.platformId) { - this.logger.warn( - "RoomManager.getRoom", - "Room missing required metadata", - { - roomId, - } - ); - return undefined; - } - - return new Room( - metadata.platformId as string, - metadata.platform as string, - { - name: metadata.name as string, - description: metadata.description as string, - participants: metadata.participants as string[], - createdAt: new Date( - metadata.created as string | number | Date - ), - lastActive: new Date( - (metadata.lastActive || metadata.created) as - | string - | number - | Date - ), - } - ); - } catch (error) { - this.logger.error("RoomManager.getRoom", "Failed to get room", { - error: error instanceof Error ? error.message : String(error), - roomId, - }); - return undefined; - } - } - - public async getRoomByPlatformId( - platformId: string, - platform: string - ): Promise { - if (platform === "consciousness") { - platformId = "main"; - } - - const roomId = Room.createDeterministicId(platform, platformId); - return this.getRoom(roomId); - } - - public async createRoom( - platformId: string, - platform: string, - metadata?: Partial - ): Promise { - if (!this.vectorDb) { - throw new Error("VectorDB required for room creation"); - } - - const room = new Room(platformId, platform, metadata); - - try { - const collection = await this.vectorDb.getCollectionForRoom( - room.id - ); - - // Update collection with full room metadata including userId - await collection.modify({ - metadata: { - description: "Room-specific memory storage", - roomId: room.id, - platform: room.platform, - platformId: room.platformId, - created: room.getMetadata().createdAt.toISOString(), - lastActive: room.getMetadata().lastActive.toISOString(), - name: metadata?.name, - participants: metadata?.participants, - userId: metadata?.userId, // Include userId in collection metadata - }, - }); - - this.logger.debug( - "RoomManager.createRoom", - "Room collection created", - { - roomId: room.id, - platform, - platformId, - userId: metadata?.userId, // Log userId - } - ); - - return room; - } catch (error) { - this.logger.error( - "RoomManager.createRoom", - "Failed to create room collection", - { - error: - error instanceof Error ? error.message : String(error), - roomId: room.id, - userId: metadata?.userId, // Log userId in errors - } - ); - throw error; - } - } - - public async addMemory( - roomId: string, - content: string, - metadata?: Record - ): Promise { - if (!this.vectorDb) { - throw new Error("VectorDB required for adding memories"); - } - - const room = await this.getRoom(roomId); - if (!room) { - throw new Error(`Room ${roomId} not found`); - } - - const memory = await room.addMemory(content, metadata); - - // Store in room-specific collection with userId from metadata - await this.vectorDb.storeInRoom(memory.content, room.id, { - memoryId: memory.id, - timestamp: memory.timestamp, - platform: room.platform, - userId: metadata?.userId, // Include userId in vector storage - ...metadata, - }); - - return memory; - } - - public async findSimilarMemoriesInRoom( - content: string, - roomId: string, - limit = 5 - ): Promise { - if (!this.vectorDb) { - throw new Error("VectorDB required for finding memories"); - } - - const results = await this.vectorDb.findSimilarInRoom( - content, - roomId, - limit - ); - - return results.map((result) => ({ - id: result.metadata?.memoryId, - roomId: roomId, - content: result.content, - timestamp: new Date(result.metadata?.timestamp), - metadata: result.metadata, - })); - } - - public async listRooms(): Promise { - if (!this.vectorDb) { - return []; - } - - const roomIds = await this.vectorDb.listRooms(); - const rooms: Room[] = []; - - for (const roomId of roomIds) { - const room = await this.getRoom(roomId); - if (room) { - rooms.push(room); - } - } - - return rooms; - } - - public async ensureRoom( - name: string, - platform: string, - userId?: string - ): Promise { - let room = await this.getRoomByPlatformId(name, platform); - if (!room) { - room = await this.createRoom(name, platform, { - name, - description: `Room for ${name}`, - participants: [], - userId, // Add userId to metadata - }); - } - return room; - } - - public async deleteRoom(roomId: string): Promise { - if (!this.vectorDb) { - return; - } - - await this.vectorDb.deleteRoom(roomId); - this.logger.info("RoomManager.deleteRoom", "Room deleted", { roomId }); - } - - public async getMemoriesFromRoom( - roomId: string, - limit?: number - ): Promise { - if (!this.vectorDb) { - throw new Error("VectorDB required for getting memories"); - } - - const room = await this.getRoom(roomId); - if (!room) { - throw new Error(`Room ${roomId} not found`); - } - - const memories = await this.vectorDb.getMemoriesFromRoom(roomId, limit); - - return memories.map((memory) => ({ - id: memory.metadata?.memoryId, - roomId: roomId, - content: memory.content, - timestamp: new Date(memory.metadata?.timestamp), - metadata: memory.metadata, - })); - } - - public async hasProcessedContentInRoom( - contentId: string, - roomId: string - ): Promise { - if (!this.vectorDb) { - throw new Error("VectorDB required for getting memories"); - } - - const room = await this.getRoom(roomId); - if (!room) { - this.logger.error( - "RoomManager.markContentAsProcessed", - "Room not found", - { - roomId, - } - ); - return false; - } - - return await this.vectorDb.hasProcessedContent(contentId, room); - } - - public async markContentAsProcessed( - contentId: string, - roomId: string - ): Promise { - if (!this.vectorDb) { - throw new Error( - "VectorDB required for marking content as processed" - ); - } - - const room = await this.getRoom(roomId); - - if (!room) { - this.logger.error( - "RoomManager.markContentAsProcessed", - "Room not found", - { - roomId, - } - ); - return false; - } - - await this.vectorDb.markContentAsProcessed(contentId, room); - return true; - } -} diff --git a/packages/core/src/core/schedule-service.ts b/packages/core/src/core/schedule-service.ts index 0f768d85..7fdb9a31 100644 --- a/packages/core/src/core/schedule-service.ts +++ b/packages/core/src/core/schedule-service.ts @@ -1,13 +1,13 @@ import { Orchestrator } from "./orchestrator"; import { HandlerRole, type VectorDB } from "./types"; import type { Logger } from "./logger"; -import type { RoomManager } from "./room-manager"; +import type { ConversationManager } from "./conversation-manager"; import type { OrchestratorDb } from "./memory"; export interface IOrchestratorContext { logger: Logger; orchestratorDb: OrchestratorDb; - roomManager: RoomManager; + conversationManager: ConversationManager; vectorDb: VectorDB; } diff --git a/packages/core/src/core/types/index.ts b/packages/core/src/core/types/index.ts index 3cec67fd..ee185524 100644 --- a/packages/core/src/core/types/index.ts +++ b/packages/core/src/core/types/index.ts @@ -328,7 +328,7 @@ export interface Thought { type: string; source: string; metadata?: Record; - roomId?: string; + conversationId?: string; } export type ThoughtType = @@ -345,7 +345,7 @@ export interface ThoughtTemplate { temperature: number; } -export interface RoomMetadata { +export interface ConversationMetadata { name: string; description?: string; participants: string[]; @@ -356,7 +356,7 @@ export interface RoomMetadata { export interface Memory { id: string; - roomId: string; + conversationId: string; content: string; timestamp: Date; metadata?: Record; @@ -374,15 +374,15 @@ export interface VectorDB { delete(id: string): Promise; - storeInRoom( + storeInConversation( content: string, - roomId: string, + conversationId: string, metadata?: Record ): Promise; - findSimilarInRoom( + findSimilarInConversation( content: string, - roomId: string, + conversationId: string, limit?: number, metadata?: Record ): Promise; @@ -552,7 +552,7 @@ export interface InputIOHandler extends BaseIOHandler { /** Identifies this as an input handler */ role: HandlerRole.INPUT; /** Function to process input data */ - execute?: (data: any) => Promise; + execute?: (data: any) => Promise; /** Sets up a subscription to receive streaming data */ subscribe?: (onData: (data: any) => void) => () => void; } @@ -580,7 +580,7 @@ export interface OutputIOHandler extends BaseIOHandler { /** Required schema to validate output data */ outputSchema: z.ZodType; /** Function to process and send output */ - execute?: (data: any) => Promise; + execute?: (data: any) => Promise; /** Sets up a subscription to handle output streams */ subscribe?: (onData: (data: any) => void) => () => void; } @@ -614,3 +614,12 @@ export type IOHandler = InputIOHandler | OutputIOHandler | ActionIOHandler; export interface AgentRequest { headers: Record; } + +/** + * Base interface for any content that can be processed + */ +export interface ProcessableContent { + conversationId?: string; + contentId?: string; + [key: string]: any; // Allow additional properties +} diff --git a/packages/core/src/core/vector-db.ts b/packages/core/src/core/vector-db.ts index 16f090f0..6cf74c70 100644 --- a/packages/core/src/core/vector-db.ts +++ b/packages/core/src/core/vector-db.ts @@ -2,7 +2,7 @@ import crypto from "crypto"; import { ChromaClient, IncludeEnum, OpenAIEmbeddingFunction } from "chromadb"; import { env } from "./env"; import { Logger } from "./logger"; -import { Room } from "./room"; +import { Conversation } from "./conversation"; import { LogLevel, type ClusterMetadata, @@ -191,7 +191,10 @@ export class ChromaVectorDB implements VectorDB { try { const collection = await this.getCollection(); // Generate deterministic ID so we don't accidentally store duplicates - const id = Room.createDeterministicMemoryId("global", content); + const id = Conversation.createDeterministicMemoryId( + "global", + content + ); this.logger.debug("ChromaVectorDB.store", "Storing content", { id, @@ -240,22 +243,22 @@ export class ChromaVectorDB implements VectorDB { } } - // ======================= ROOM-SPECIFIC METHODS ======================= + // ======================= conversation-SPECIFIC METHODS ======================= /** - * Returns (and creates if necessary) a separate collection for a given room. - * Rooms are typically namespaced as `room_`. + * Returns (and creates if necessary) a separate collection for a given conversation. + * conversations are typically namespaced as `conversation_`. */ - public async getCollectionForRoom(roomId: string) { - const collectionName = `room_${roomId}`; + public async getCollectionForConversation(conversationId: string) { + const collectionName = `conversation_${conversationId}`; return this.client.getOrCreateCollection({ name: collectionName, embeddingFunction: this.embedder, metadata: { - description: "Room-specific memory storage", - roomId, - platform: roomId.split("_")[0], - platformId: roomId.split("_")[0] + "_platform", // TODO: This is a hack to get the platform ID + description: "Conversation-specific memory storage", + conversationId, + platform: conversationId.split("_")[0], + platformId: conversationId.split("_")[0] + "_platform", // TODO: This is a hack to get the platform ID created: new Date().toISOString(), lastActive: new Date().toISOString(), }, @@ -263,39 +266,51 @@ export class ChromaVectorDB implements VectorDB { } /** - * Stores content in a specific room's memory, also associating it with a cluster ID. + * Stores content in a specific conversation's memory, also associating it with a cluster ID. */ - public async storeInRoom( + public async storeInConversation( content: string, - roomId: string, + conversationId: string, metadata: Record = {} ): Promise { try { // Add detailed logging - this.logger.debug("ChromaVectorDB.storeInRoom", "Storing content", { - content, - contentType: typeof content, - contentLength: content?.length, - roomId, - metadata, - }); + this.logger.debug( + "ChromaVectorDB.storeInConversation", + "Storing content", + { + content, + contentType: typeof content, + contentLength: content?.length, + conversationId, + metadata, + } + ); // Ensure content is a non-empty string if (!content || typeof content !== "string") { throw new Error(`Invalid content: ${typeof content}`); } - const collection = await this.getCollectionForRoom(roomId); - const id = Room.createDeterministicMemoryId(roomId, content); + const collection = + await this.getCollectionForConversation(conversationId); + const id = Conversation.createDeterministicMemoryId( + conversationId, + content + ); const timestamp = new Date(metadata.timestamp || Date.now()); - this.logger.debug("ChromaVectorDB.storeInRoom", "Generated ID", { - id, - roomId, - timestamp: timestamp.toISOString(), - }); + this.logger.debug( + "ChromaVectorDB.storeInConversation", + "Generated ID", + { + id, + conversationId, + timestamp: timestamp.toISOString(), + } + ); - // Update the room's metadata + // Update the conversation's metadata await collection.modify({ metadata: { ...collection.metadata, @@ -310,52 +325,57 @@ export class ChromaVectorDB implements VectorDB { metadatas: [ { ...metadata, - roomId, + conversationId, timestamp: timestamp.toISOString(), }, ], }); this.logger.debug( - "ChromaVectorDB.storeInRoom", + "ChromaVectorDB.storeInConversation", "Successfully stored", { id, - roomId, + conversationId, contentLength: content.length, } ); } catch (error) { - this.logger.error("ChromaVectorDB.storeInRoom", "Storage failed", { - error: error instanceof Error ? error.message : String(error), - content, - contentType: typeof content, - roomId, - }); + this.logger.error( + "ChromaVectorDB.storeInConversation", + "Storage failed", + { + error: + error instanceof Error ? error.message : String(error), + content, + contentType: typeof content, + conversationId, + } + ); throw error; } } /** - * Finds similar items in a given room's collection. If no cluster match, - * falls back to "global" search in that room's collection. + * Finds similar items in a given conversation's collection. If no cluster match, + * falls back to "global" search in that conversation's collection. */ - public async findSimilarInRoom( + public async findSimilarInConversation( content: string, - roomId: string, + conversationId: string, limit = 5, metadata?: Record ): Promise { try { // Add detailed logging this.logger.debug( - "ChromaVectorDB.findSimilarInRoom", + "ChromaVectorDB.findSimilarInconversation", "Input content details", { content, contentType: typeof content, contentLength: content?.length, - roomId, + conversationId, metadata, } ); @@ -363,25 +383,26 @@ export class ChromaVectorDB implements VectorDB { // Ensure content is a non-empty string if (!content || typeof content !== "string") { this.logger.warn( - "ChromaVectorDB.findSimilarInRoom", + "ChromaVectorDB.findSimilarInconversation", "Invalid content", { content, contentType: typeof content, - roomId, + conversationId, } ); return []; } - const collection = await this.getCollectionForRoom(roomId); + const collection = + await this.getCollectionForConversation(conversationId); this.logger.debug( - "ChromaVectorDB.findSimilarInRoom", + "ChromaVectorDB.findSimilarInConversation", "Querying collection", { queryText: content, - roomId, + conversationId, limit, metadata, } @@ -405,14 +426,14 @@ export class ChromaVectorDB implements VectorDB { })); } catch (error) { this.logger.error( - "ChromaVectorDB.findSimilarInRoom", + "ChromaVectorDB.findSimilarInconversation", "Search failed", { error: error instanceof Error ? error.message : String(error), content, contentType: typeof content, - roomId, + conversationId, } ); return []; @@ -420,15 +441,16 @@ export class ChromaVectorDB implements VectorDB { } /** - * Fallback search for a room: no cluster restriction, just raw similarity. + * Fallback search for a conversation: no cluster restriction, just raw similarity. */ - private async findSimilarInRoomGlobal( + private async findSimilarInConversationGlobal( content: string, - roomId: string, + conversationId: string, limit = 5, metadata?: Record ): Promise { - const collection = await this.getCollectionForRoom(roomId); + const collection = + await this.getCollectionForConversation(conversationId); const results = await collection.query({ queryTexts: [content], nResults: limit, @@ -462,41 +484,55 @@ export class ChromaVectorDB implements VectorDB { } /** - * Lists the known "room_..." collections. + * Lists the known "conversation_..." collections. */ - public async listRooms(): Promise { + public async listConversations(): Promise { const collections = await this.client.listCollections(); // If .listCollections() returns an array of objects that contain `name`, // you may need to adapt the .filter / .map return collections .map((c) => c) - .filter((name: string) => name.startsWith("room_")) - .map((name: string) => name.replace("room_", "")); + .filter((name: string) => name.startsWith("conversation_")) + .map((name: string) => name.replace("conversation_", "")); } /** - * Gets the memory count for a specific room. + * Gets the memory count for a specific conversation. */ - public async getRoomMemoryCount(roomId: string): Promise { - const collection = await this.getCollectionForRoom(roomId); + public async getConversationMemoryCount( + conversationId: string + ): Promise { + const collection = + await this.getCollectionForConversation(conversationId); return collection.count(); } /** - * Deletes an entire room's collection. + * Deletes an entire conversation's collection. */ - public async deleteRoom(roomId: string): Promise { + public async deleteConversation(conversationId: string): Promise { try { - await this.client.deleteCollection({ name: `room_${roomId}` }); - this.logger.info("ChromaVectorDB.deleteRoom", "Room deleted", { - roomId, + await this.client.deleteCollection({ + name: `conversation_${conversationId}`, }); + this.logger.info( + "ChromaVectorDB.deleteConversation", + "Conversation deleted", + { + conversationId, + } + ); } catch (error) { - this.logger.error("ChromaVectorDB.deleteRoom", "Deletion failed", { - error: error instanceof Error ? error.message : String(error), - roomId, - }); + this.logger.error( + "ChromaVectorDB.deleteConversation", + "Deletion failed", + { + error: + error instanceof Error ? error.message : String(error), + conversationId, + } + ); throw error; } } @@ -1593,10 +1629,12 @@ export class ChromaVectorDB implements VectorDB { // Check if we've already processed this content public async hasProcessedContent( contentId: string, - room: Room + conversation: Conversation ): Promise { try { - const collection = await this.getCollectionForRoom(room.id); + const collection = await this.getCollectionForConversation( + conversation.id + ); // Search for exact match of the content ID in metadata const results = await collection.get({ @@ -1617,7 +1655,7 @@ export class ChromaVectorDB implements VectorDB { error: error instanceof Error ? error.message : String(error), contentId, - roomId: room.id, + conversationId: conversation.id, } ); return false; @@ -1627,10 +1665,12 @@ export class ChromaVectorDB implements VectorDB { // Mark content as processed public async markContentAsProcessed( contentId: string, - room: Room + conversation: Conversation ): Promise { try { - const collection = await this.getCollectionForRoom(room.id); + const collection = await this.getCollectionForConversation( + conversation.id + ); const markerId = `processed_${contentId}`; await collection.add({ @@ -1650,7 +1690,7 @@ export class ChromaVectorDB implements VectorDB { "Marked content as processed", { contentId, - roomId: room.id, + conversationId: conversation.id, markerId, } ); @@ -1662,7 +1702,7 @@ export class ChromaVectorDB implements VectorDB { error: error instanceof Error ? error.message : String(error), contentId, - roomId: room.id, + conversationId: conversation.id, } ); throw error; @@ -1670,14 +1710,15 @@ export class ChromaVectorDB implements VectorDB { } /** - * Gets all memories from a specific room's collection, optionally limited to a certain number + * Gets all memories from a specific conversation's collection, optionally limited to a certain number */ - public async getMemoriesFromRoom( - roomId: string, + public async getMemoriesFromConversation( + conversationId: string, limit?: number ): Promise<{ content: string; metadata?: Record }[]> { try { - const collection = await this.getCollectionForRoom(roomId); + const collection = + await this.getCollectionForConversation(conversationId); // Get all documents from the collection, with optional limit const results = await collection.get({ @@ -1695,12 +1736,12 @@ export class ChromaVectorDB implements VectorDB { })); } catch (error) { this.logger.error( - "ChromaVectorDB.getMemoriesFromRoom", + "ChromaVectorDB.getMemoriesFromConversation", "Failed to get memories", { error: error instanceof Error ? error.message : String(error), - roomId, + conversationId, } ); throw error; From 19a6d0c42a0313f9f675ce19fed3f6574221b10b Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Fri, 31 Jan 2025 13:44:10 +1100 Subject: [PATCH 02/11] minor update --- packages/core/src/core/orchestrator.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index a515aa55..08997c35 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -520,8 +520,6 @@ export class Orchestrator { } ); } - - // Grab memories } // Gather possible outputs & actions From 22927234019ceedda739b25be4985ef88f3296c6 Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Fri, 31 Jan 2025 13:58:18 +1100 Subject: [PATCH 03/11] hooks --- packages/core/src/core/orchestrator.ts | 74 +++++++++++--------------- 1 file changed, 31 insertions(+), 43 deletions(-) diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index 08997c35..cb72a923 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -240,13 +240,13 @@ export class Orchestrator { } /** - * Takes some incoming piece of data, processes it through the system, - * and handles any follow-on "action" or "output" suggestions in a queue. + * Processes incoming data through the system and manages any follow-on + * "action" or "output" suggestions in a queue. * - * @param request A request-like object (headers, etc.) from which we extract user info - * @param initialData The data payload to process - * @param sourceName The IOHandler name that provided this data - * @param orchestratorId An optional existing orchestrator record ID to tie into + * @param request A request-like object containing headers and other metadata + * @param initialData The data payload to be processed + * @param sourceName The name of the IOHandler that provided this data + * @param orchestratorId An optional existing orchestrator record ID to associate with */ private async runAutonomousFlow( request: AgentRequest, @@ -254,23 +254,18 @@ export class Orchestrator { sourceName: string, orchestratorId?: string ) { - // For illustration, extract userId from headers. Adjust the header name as needed. const userId = request.headers["x-user-id"] || "agent"; - const queue: Array<{ data: unknown; source: string }> = []; + // Initialize the processing queue with the initial data + const queue: Array<{ data: unknown; source: string }> = Array.isArray( + initialData + ) + ? initialData.map((item) => ({ data: item, source: sourceName })) + : [{ data: initialData, source: sourceName }]; - if (Array.isArray(initialData)) { - for (const item of initialData) { - queue.push({ data: item, source: sourceName }); - } - } else { - queue.push({ data: initialData, source: sourceName }); - } - - // Optionally store final outputs to return const outputs: Array<{ name: string; data: any }> = []; - // 1. Fire the "flow start" hook (could create or fetch an orchestrator record). + // Trigger the onFlowStart hook if defined if (this.flowLifecycle?.onFlowStart) { const maybeId = await this.flowLifecycle.onFlowStart( userId, @@ -282,11 +277,11 @@ export class Orchestrator { } } - // 2. Process items in a queue + // Process each item in the queue while (queue.length > 0) { const { data, source } = queue.shift()!; - // 2a. Notify the flowStep hook that we have new input + // Notify the onFlowStep hook of new input if (this.flowLifecycle?.onFlowStep) { await this.flowLifecycle.onFlowStep( orchestratorId, @@ -297,12 +292,13 @@ export class Orchestrator { ); } - // 2b. The main processing + // Main processing of the data const processedResults = await this.processContent( data as ProcessableContent | ProcessableContent[], source, userId ); + if (!processedResults || processedResults.length === 0) { continue; } @@ -310,7 +306,7 @@ export class Orchestrator { for (const processed of processedResults) { if (processed.alreadyProcessed) continue; - // 2c. If we have tasks to schedule, pass them to the hook + // Schedule tasks if any are present if ( processed.updateTasks?.length && this.flowLifecycle?.onTasksScheduled @@ -325,7 +321,7 @@ export class Orchestrator { ); } - // 2d. For each suggested output/action, dispatch them + // Dispatch suggested outputs or actions for (const output of processed.suggestedOutputs ?? []) { const handler = this.ioHandlers.get(output.name); if (!handler) { @@ -344,7 +340,6 @@ export class Orchestrator { output.data ); - // Notify the flowStep hook that we output something if (this.flowLifecycle?.onFlowStep) { await this.flowLifecycle.onFlowStep( orchestratorId, @@ -354,7 +349,7 @@ export class Orchestrator { output.data ); } - // Or specifically call onOutputDispatched if you prefer: + if (this.flowLifecycle?.onOutputDispatched) { await this.flowLifecycle.onOutputDispatched( orchestratorId, @@ -370,8 +365,10 @@ export class Orchestrator { output.data ); - // Notify a flow step or action-dispatched hook - if (this.flowLifecycle?.onFlowStep) { + if ( + this.flowLifecycle?.onActionDispatched && + this.flowLifecycle?.onFlowStep + ) { await this.flowLifecycle.onFlowStep( orchestratorId, userId, @@ -379,8 +376,6 @@ export class Orchestrator { output.name, { input: output.data, result: actionResult } ); - } - if (this.flowLifecycle?.onActionDispatched) { await this.flowLifecycle.onActionDispatched( orchestratorId, userId, @@ -390,20 +385,13 @@ export class Orchestrator { ); } - // If the action returns new data, queue it up + // Queue new data from action results if (actionResult) { - if (Array.isArray(actionResult)) { - for (const item of actionResult) { - queue.push({ - data: item, - source: output.name, - }); - } - } else { - queue.push({ - data: actionResult, - source: output.name, - }); + const newItems = Array.isArray(actionResult) + ? actionResult + : [actionResult]; + for (const item of newItems) { + queue.push({ data: item, source: output.name }); } } } else { @@ -417,7 +405,7 @@ export class Orchestrator { } } - // 3. Return final outputs, or handle them as you see fit + // Return the final outputs return outputs; } From 8b6cfb035b5f33664982f8759f281fab3964d50d Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Fri, 31 Jan 2025 14:02:05 +1100 Subject: [PATCH 04/11] refactor --- packages/core/src/core/orchestrator.ts | 171 ++++++++++++++----------- 1 file changed, 94 insertions(+), 77 deletions(-) diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index cb72a923..a353d0cb 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -253,35 +253,36 @@ export class Orchestrator { initialData: unknown, sourceName: string, orchestratorId?: string - ) { - const userId = request.headers["x-user-id"] || "agent"; + ): Promise> { + const userId = request.headers["x-user-id"] ?? "agent"; - // Initialize the processing queue with the initial data + // Prepare the processing queue const queue: Array<{ data: unknown; source: string }> = Array.isArray( initialData ) ? initialData.map((item) => ({ data: item, source: sourceName })) : [{ data: initialData, source: sourceName }]; + // Initialize an array to collect outputs const outputs: Array<{ name: string; data: any }> = []; - // Trigger the onFlowStart hook if defined + // Trigger onFlowStart, possibly updating orchestratorId if (this.flowLifecycle?.onFlowStart) { - const maybeId = await this.flowLifecycle.onFlowStart( + const flowId = await this.flowLifecycle.onFlowStart( userId, sourceName, initialData ); - if (maybeId) { - orchestratorId = maybeId; + if (flowId) { + orchestratorId = flowId; } } - // Process each item in the queue + // Process the queue until empty while (queue.length > 0) { const { data, source } = queue.shift()!; - // Notify the onFlowStep hook of new input + // Notify of an incoming step if (this.flowLifecycle?.onFlowStep) { await this.flowLifecycle.onFlowStep( orchestratorId, @@ -292,28 +293,31 @@ export class Orchestrator { ); } - // Main processing of the data + // Main content processing const processedResults = await this.processContent( data as ProcessableContent | ProcessableContent[], source, userId ); - if (!processedResults || processedResults.length === 0) { + if (!processedResults?.length) { continue; } - for (const processed of processedResults) { - if (processed.alreadyProcessed) continue; + // Handle each processed result + for (const result of processedResults) { + if (result.alreadyProcessed) { + continue; + } - // Schedule tasks if any are present + // Schedule tasks if present if ( - processed.updateTasks?.length && + result.updateTasks?.length && this.flowLifecycle?.onTasksScheduled ) { await this.flowLifecycle.onTasksScheduled( userId, - processed.updateTasks.map((task) => ({ + result.updateTasks.map((task) => ({ name: task.name, data: task.data, intervalMs: task.intervalMs, @@ -322,8 +326,9 @@ export class Orchestrator { } // Dispatch suggested outputs or actions - for (const output of processed.suggestedOutputs ?? []) { + for (const output of result.suggestedOutputs ?? []) { const handler = this.ioHandlers.get(output.name); + if (!handler) { this.logger.warn( "Orchestrator.runAutonomousFlow", @@ -332,80 +337,92 @@ export class Orchestrator { continue; } - if (handler.role === HandlerRole.OUTPUT) { - outputs.push({ name: output.name, data: output.data }); - await this.dispatchToOutput( - output.name, - request, - output.data - ); - - if (this.flowLifecycle?.onFlowStep) { - await this.flowLifecycle.onFlowStep( - orchestratorId, - userId, - HandlerRole.OUTPUT, + // Depending on the handler role, dispatch appropriately + switch (handler.role) { + case HandlerRole.OUTPUT: + outputs.push({ + name: output.name, + data: output.data, + }); + await this.dispatchToOutput( output.name, + request, output.data ); - } - if (this.flowLifecycle?.onOutputDispatched) { - await this.flowLifecycle.onOutputDispatched( - orchestratorId, - userId, + if (this.flowLifecycle?.onFlowStep) { + await this.flowLifecycle.onFlowStep( + orchestratorId, + userId, + HandlerRole.OUTPUT, + output.name, + output.data + ); + } + + if (this.flowLifecycle?.onOutputDispatched) { + await this.flowLifecycle.onOutputDispatched( + orchestratorId, + userId, + output.name, + output.data + ); + } + break; + + case HandlerRole.ACTION: + const actionResult = await this.dispatchToAction( output.name, + request, output.data ); - } - } else if (handler.role === HandlerRole.ACTION) { - const actionResult = await this.dispatchToAction( - output.name, - request, - output.data - ); - if ( - this.flowLifecycle?.onActionDispatched && - this.flowLifecycle?.onFlowStep - ) { - await this.flowLifecycle.onFlowStep( - orchestratorId, - userId, - HandlerRole.ACTION, - output.name, - { input: output.data, result: actionResult } - ); - await this.flowLifecycle.onActionDispatched( - orchestratorId, - userId, - output.name, - output.data, - actionResult - ); - } - - // Queue new data from action results - if (actionResult) { - const newItems = Array.isArray(actionResult) - ? actionResult - : [actionResult]; - for (const item of newItems) { - queue.push({ data: item, source: output.name }); + if ( + this.flowLifecycle?.onFlowStep && + this.flowLifecycle?.onActionDispatched + ) { + await this.flowLifecycle.onFlowStep( + orchestratorId, + userId, + HandlerRole.ACTION, + output.name, + { input: output.data, result: actionResult } + ); + await this.flowLifecycle.onActionDispatched( + orchestratorId, + userId, + output.name, + output.data, + actionResult + ); } - } - } else { - this.logger.warn( - "Orchestrator.runAutonomousFlow", - "Suggested output has an unrecognized role", - handler.role - ); + + // Queue any new data returned from the action + if (actionResult) { + const newItems = Array.isArray(actionResult) + ? actionResult + : [actionResult]; + for (const item of newItems) { + queue.push({ + data: item, + source: output.name, + }); + } + } + break; + + default: + this.logger.warn( + "Orchestrator.runAutonomousFlow", + "Suggested output has an unrecognized role", + handler.role + ); } } } } - // Return the final outputs + // Return all collected outputs return outputs; } From d8b1973463c2c40c92c3d31ef56d4aaea923de7b Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Fri, 31 Jan 2025 17:57:05 +1100 Subject: [PATCH 05/11] fix memory --- packages/core/src/core/db/mongo-db.ts | 2 + packages/core/src/core/io/discord.ts | 3 +- packages/core/src/core/life-cycle.ts | 33 +++++--- packages/core/src/core/memory.ts | 4 +- packages/core/src/core/orchestrator.ts | 102 ++++++++++++------------- 5 files changed, 77 insertions(+), 67 deletions(-) diff --git a/packages/core/src/core/db/mongo-db.ts b/packages/core/src/core/db/mongo-db.ts index ffe37a71..cf8d440e 100644 --- a/packages/core/src/core/db/mongo-db.ts +++ b/packages/core/src/core/db/mongo-db.ts @@ -214,6 +214,7 @@ export class MongoDb implements OrchestratorDb { name: string, data: unknown ): Promise { + console.log("addMessage", orchestratorId, role, name, data); await this.orchestratorCollection.updateOne( { _id: orchestratorId }, { @@ -241,6 +242,7 @@ export class MongoDb implements OrchestratorDb { const doc = await this.orchestratorCollection.findOne({ _id: orchestratorId, }); + console.log("getMessages", orchestratorId, doc); if (!doc) return []; return doc.messages; } diff --git a/packages/core/src/core/io/discord.ts b/packages/core/src/core/io/discord.ts index c3be1be4..61403e66 100644 --- a/packages/core/src/core/io/discord.ts +++ b/packages/core/src/core/io/discord.ts @@ -94,7 +94,8 @@ export class DiscordClient { onData({ content: message.content, - channelId: message.channelId, + contentId: message.id, + conversationId: message.channel.id, sentBy: message.author?.id, }); }; diff --git a/packages/core/src/core/life-cycle.ts b/packages/core/src/core/life-cycle.ts index edd14ff6..113f5316 100644 --- a/packages/core/src/core/life-cycle.ts +++ b/packages/core/src/core/life-cycle.ts @@ -144,12 +144,23 @@ export function makeFlowLifecycle( source, ...updates, }); + + await orchestratorDb.addMessage( + conversationId, + HandlerRole.INPUT, + source, + { + content, + conversationId, + } + ); }, async onMemoriesRequested( conversationId: string, limit?: number ): Promise<{ memories: Memory[]; chatHistory: OrchestratorMessage[] }> { + console.log("onMemoriesRequested", conversationId, limit); // get vector based memories // todo, we could base this on a userID so the agent has memories across conversations const memories = @@ -161,6 +172,8 @@ export function makeFlowLifecycle( const chatHistory = await orchestratorDb.getMessages(conversationId); + console.log("chatHistory", memories, chatHistory); + return { memories, chatHistory }; }, @@ -185,7 +198,7 @@ export interface FlowLifecycle { * Called when a new flow is started or continued. * Allows you to create or fetch an Orchestrator record, returning an ID if relevant. */ - onFlowStart?( + onFlowStart( userId: string, sourceName: string, initialData: unknown @@ -194,7 +207,7 @@ export interface FlowLifecycle { /** * Called when new data is processed in the flow (e.g., an input message). */ - onFlowStep?( + onFlowStep( orchestratorId: string | undefined, userId: string, role: HandlerRole, @@ -206,7 +219,7 @@ export interface FlowLifecycle { * Called when the Orchestrator wants to schedule tasks (e.g. recurring tasks). * You can store them in your DB or in a queue system. */ - onTasksScheduled?( + onTasksScheduled( userId: string, tasks: { name: string; @@ -218,7 +231,7 @@ export interface FlowLifecycle { /** * Called after an output is dispatched (e.g. store it or log it). */ - onOutputDispatched?( + onOutputDispatched( orchestratorId: string | undefined, userId: string, outputName: string, @@ -228,7 +241,7 @@ export interface FlowLifecycle { /** * Called after an action is dispatched (e.g. store it or log it). */ - onActionDispatched?( + onActionDispatched( orchestratorId: string | undefined, userId: string, actionName: string, @@ -239,7 +252,7 @@ export interface FlowLifecycle { /** * Called when content has been processed in a conversation */ - onContentProcessed?( + onContentProcessed( userId: string, conversationId: string, content: string, @@ -259,7 +272,7 @@ export interface FlowLifecycle { /** * Called when a conversation is updated */ - onConversationUpdated?( + onConversationUpdated( contentId: string, conversationId: string, content: string, @@ -270,7 +283,7 @@ export interface FlowLifecycle { /** * Called when a new memory needs to be added to a conversation */ - onMemoryAdded?( + onMemoryAdded( conversationId: string, content: string, source: string, @@ -280,7 +293,7 @@ export interface FlowLifecycle { /** * Called when memories need to be retrieved for a conversation */ - onMemoriesRequested?( + onMemoriesRequested( conversationId: string, limit?: number ): Promise<{ memories: Memory[]; chatHistory: OrchestratorMessage[] }>; @@ -288,7 +301,7 @@ export interface FlowLifecycle { /** * Called to check if specific content has been processed in a conversation */ - onCheckContentProcessed?( + onCheckContentProcessed( contentId: string, conversationId: string ): Promise; diff --git a/packages/core/src/core/memory.ts b/packages/core/src/core/memory.ts index 05b0b3ad..9ed72e7d 100644 --- a/packages/core/src/core/memory.ts +++ b/packages/core/src/core/memory.ts @@ -38,12 +38,12 @@ export interface OrchestratorDb { getOrchestratorsByUserId(userId: string): Promise; createOrchestrator(userId: string): Promise; addMessage( - orchestratorId: string, + conversationId: string, role: HandlerRole, name: string, data: any ): Promise; - getMessages(orchestratorId: string): Promise; + getMessages(conversationId: string): Promise; // Task management methods createTask( diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index a353d0cb..d5077d87 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -283,15 +283,14 @@ export class Orchestrator { const { data, source } = queue.shift()!; // Notify of an incoming step - if (this.flowLifecycle?.onFlowStep) { - await this.flowLifecycle.onFlowStep( - orchestratorId, - userId, - HandlerRole.INPUT, - source, - data - ); - } + + await this.flowLifecycle.onFlowStep( + orchestratorId, + userId, + HandlerRole.INPUT, + source, + data + ); // Main content processing const processedResults = await this.processContent( @@ -311,10 +310,7 @@ export class Orchestrator { } // Schedule tasks if present - if ( - result.updateTasks?.length && - this.flowLifecycle?.onTasksScheduled - ) { + if (result.updateTasks?.length) { await this.flowLifecycle.onTasksScheduled( userId, result.updateTasks.map((task) => ({ @@ -433,7 +429,7 @@ export class Orchestrator { public async processContent( content: ProcessableContent | ProcessableContent[], source: string, - userId?: string + userId: string ): Promise { if (Array.isArray(content)) { const allResults: ProcessedResult[] = []; @@ -469,25 +465,20 @@ export class Orchestrator { private async processContentItem( content: ProcessableContent, source: string, - userId?: string + userId: string ): Promise { let memories: { memories: Memory[]; chatHistory: OrchestratorMessage[]; } = { memories: [], chatHistory: [] }; - if ( - content.conversationId && - content.contentId && - this.flowLifecycle?.onCheckContentProcessed - ) { - const hasProcessed = + if (content.conversationId && content.contentId) { + if ( await this.flowLifecycle.onCheckContentProcessed( content.contentId, content.conversationId - ); - - if (hasProcessed) { + ) + ) { this.logger.debug( "Orchestrator.processContentItem", "Content already processed", @@ -500,31 +491,28 @@ export class Orchestrator { return null; } - if (userId && this.flowLifecycle?.onMemoriesRequested) { - // Ensure the conversation - const conversation = - await this.flowLifecycle.onConversationCreated( - userId, - content.conversationId, - source - ); + // Ensure the conversation + const conversation = await this.flowLifecycle.onConversationCreated( + userId, + content.conversationId, + source + ); - memories = await this.flowLifecycle.onMemoriesRequested( - conversation.id - ); + memories = await this.flowLifecycle.onMemoriesRequested( + conversation.id + ); - this.logger.debug( - "Orchestrator.processContentItem", - "Processing content with context", - { - content, - source, - conversationId: conversation.id, - userId, - relevantMemories: memories, - } - ); - } + this.logger.debug( + "Orchestrator.processContentItem", + "Processing content with context", + { + content, + source, + conversationId: conversation.id, + userId, + relevantMemories: memories, + } + ); } // Gather possible outputs & actions @@ -546,13 +534,7 @@ export class Orchestrator { ); // If there's a conversationId, store the memory and mark processed - if ( - content.conversationId && - result && - content.contentId && - this.flowLifecycle?.onMemoryAdded && - this.flowLifecycle?.onConversationUpdated - ) { + if (content.conversationId && content.contentId) { await this.flowLifecycle.onMemoryAdded( content.conversationId, JSON.stringify(result.content), @@ -562,6 +544,18 @@ export class Orchestrator { ...result.enrichedContext, } ); + + this.logger.debug( + "Orchestrator.processContentItem", + "Updating conversation", + { + contentId: content.contentId, + conversationId: content.conversationId, + userId, + result, + } + ); + await this.flowLifecycle.onConversationUpdated( content.contentId, content.conversationId, From b6984015e8b39c1cb9463cda7c230b4349ddd944 Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Sat, 1 Feb 2025 22:38:57 +1100 Subject: [PATCH 06/11] types and cleanup --- examples/example-api.ts | 48 ++--- examples/example-discord.ts | 4 +- examples/example-server.ts | 17 +- examples/example-twitter.ts | 32 ++- packages/core/src/core/db/mongo-db.ts | 67 +++++-- packages/core/src/core/io/discord.ts | 25 ++- packages/core/src/core/life-cycle.ts | 112 +++++------ packages/core/src/core/memory.ts | 34 +++- packages/core/src/core/orchestrator.ts | 261 +++++++++---------------- packages/core/src/core/types/index.ts | 33 +++- 10 files changed, 316 insertions(+), 317 deletions(-) diff --git a/examples/example-api.ts b/examples/example-api.ts index 9e80ac2f..8a835a2a 100644 --- a/examples/example-api.ts +++ b/examples/example-api.ts @@ -9,7 +9,7 @@ import { Orchestrator } from "../packages/core/src/core/orchestrator"; import { HandlerRole } from "../packages/core/src/core/types"; -import { RoomManager } from "../packages/core/src/core/conversation-manager"; +import { ConversationManager } from "../packages/core/src/core/conversation-manager"; import { ChromaVectorDB } from "../packages/core/src/core/vector-db"; import { MessageProcessor } from "../packages/core/src/core/processors/message-processor"; import { ResearchQuantProcessor } from "../packages/core/src/core/processors/research-processor"; @@ -22,6 +22,7 @@ import { z } from "zod"; import readline from "readline"; import { MongoDb } from "../packages/core/src/core/db/mongo-db"; import { MasterProcessor } from "../packages/core/src/core/processors/master-processor"; +import { makeFlowLifecycle } from "../packages/core/src/core/life-cycle"; async function main() { const loglevel = LogLevel.DEBUG; @@ -33,7 +34,7 @@ async function main() { await vectorDb.purge(); // Clear previous session data - const roomManager = new RoomManager(vectorDb); + const conversationManager = new ConversationManager(vectorDb); // Research client const researchClient = new LLMClient({ @@ -52,22 +53,16 @@ async function main() { loglevel ); - // Initialize processor with default character personality - const messageProcessor = new MessageProcessor( - llmClient, - defaultCharacter, - loglevel - ); - - const researchProcessor = new ResearchQuantProcessor( - researchClient, - defaultCharacter, - loglevel, - 1000 // chunk size, depends - ); - // Add processors to the master processor - masterProcessor.addProcessor([messageProcessor, researchProcessor]); + masterProcessor.addProcessor([ + new MessageProcessor(llmClient, defaultCharacter, loglevel), + new ResearchQuantProcessor( + researchClient, + defaultCharacter, + loglevel, + 1000 // chunk size, depends + ), + ]); const scheduledTaskDb = new MongoDb( "mongodb://localhost:27017", @@ -80,12 +75,9 @@ async function main() { await scheduledTaskDb.deleteAll(); - // Initialize core system const orchestrator = new Orchestrator( - roomManager, - vectorDb, masterProcessor, - scheduledTaskDb, + makeFlowLifecycle(scheduledTaskDb, conversationManager), { level: loglevel, enableColors: true, @@ -94,7 +86,7 @@ async function main() { ); // Initialize autonomous thought generation - const consciousness = new Consciousness(llmClient, roomManager, { + const consciousness = new Consciousness(llmClient, conversationManager, { intervalMs: 300000, // Think every 5 minutes minConfidence: 0.7, logLevel: loglevel, @@ -220,12 +212,12 @@ async function main() { const outputs: any = await orchestrator.dispatchToInput( "user_chat", { - headers: { - "x-user-id": userId, - }, - }, - userMessage, - userId + userId, + platformId: "discord", + threadId: "123", + data: { content: userMessage }, + contentId: "123", + } ); // Now `outputs` is an array of suggestions with role=output that got triggered diff --git a/examples/example-discord.ts b/examples/example-discord.ts index d85bef0b..c2efe121 100644 --- a/examples/example-discord.ts +++ b/examples/example-discord.ts @@ -90,9 +90,7 @@ async function main() { name: "discord_stream", role: HandlerRole.INPUT, subscribe: (onData) => { - discord.startMessageStream((incomingMessage: Message) => { - onData(incomingMessage); - }); + discord.startMessageStream(onData); return () => { discord.stopMessageStream(); }; diff --git a/examples/example-server.ts b/examples/example-server.ts index 5a74836d..edac04b8 100644 --- a/examples/example-server.ts +++ b/examples/example-server.ts @@ -154,16 +154,13 @@ wss.on("connection", (ws) => { } // Process the message using the orchestrator with the provided userId - const outputs = await orchestrator.dispatchToInput( - "user_chat", - { - headers: { - "x-user-id": userId, - }, - }, - { content: userMessage }, - orchestratorId ? orchestratorId : undefined - ); + const outputs = await orchestrator.dispatchToInput("user_chat", { + userId, + platformId: "discord", + threadId: orchestratorId, + data: { content: userMessage }, + contentId: orchestratorId, + }); // Send responses back through WebSocket if (outputs && (outputs as any).length > 0) { diff --git a/examples/example-twitter.ts b/examples/example-twitter.ts index 097104bc..bcc5d531 100644 --- a/examples/example-twitter.ts +++ b/examples/example-twitter.ts @@ -136,15 +136,21 @@ async function main() { return []; } - // Map mentions to the required format - return mentions.map((mention) => ({ - type: "tweet", - conversationId: mention.metadata.conversationId, - contentId: mention.metadata.tweetId, - user: mention.metadata.username, - content: mention.content, - metadata: mention, - })); + // Filter out mentions that do not have the required non-null properties before mapping + return mentions + .filter( + (mention) => + mention.metadata.tweetId !== undefined && + mention.metadata.conversationId !== undefined && + mention.metadata.userId !== undefined + ) + .map((mention) => ({ + userId: mention.metadata.userId!, + threadId: mention.metadata.conversationId!, + contentId: mention.metadata.tweetId!, + platformId: "twitter", + data: mention, + })); }, }); @@ -161,7 +167,13 @@ async function main() { return []; } - return thought; + return { + userId: "internal", + threadId: "internal", + contentId: "internal", + platformId: "internal", + data: thought, + }; }, }); diff --git a/packages/core/src/core/db/mongo-db.ts b/packages/core/src/core/db/mongo-db.ts index cf8d440e..f70fbc1f 100644 --- a/packages/core/src/core/db/mongo-db.ts +++ b/packages/core/src/core/db/mongo-db.ts @@ -1,9 +1,8 @@ import { MongoClient, Collection, ObjectId } from "mongodb"; -import type { HandlerRole } from "../types"; +import type { Chat, ChatMessage, HandlerRole } from "../types"; import type { OrchestratorChat, OrchestratorDb, - OrchestratorMessage, ScheduledTask, } from "../memory"; @@ -12,6 +11,8 @@ export class MongoDb implements OrchestratorDb { private collection!: Collection; private orchestratorCollection!: Collection; + private chatsCollection!: Collection; + /** * @param uri A MongoDB connection string * @param dbName Name of the database to use @@ -36,6 +37,18 @@ export class MongoDb implements OrchestratorDb { const db = this.client.db(this.dbName); this.collection = db.collection(this.collectionName); + this.chatsCollection = db.collection("chats"); + + // Create indexes for efficient querying + await this.chatsCollection.createIndex( + { + userId: 1, + platformId: 1, + threadId: 1, + }, + { unique: true } + ); + // Optional: Create indexes // - An index on nextRunAt helps find "due" tasks quickly // - An index on status helps filter quickly by status @@ -185,18 +198,33 @@ export class MongoDb implements OrchestratorDb { await this.collection.deleteMany({}); } - /** - * Creates a new "orchestrator" document for a user, returning its generated _id. - * This can represent a "new chat/session" with the agent. - */ - public async createOrchestrator(userId: string): Promise { - const chat: OrchestratorChat = { - userId: userId, + public async getOrCreateChat( + userId: string, + platformId: string, + threadId: string, + metadata?: Record + ): Promise { + const existingChat = await this.chatsCollection.findOne({ + userId, + platformId, + threadId, + }); + + if (existingChat) { + return existingChat._id!.toString(); + } + + const chat: Chat = { + userId, + platformId, + threadId, createdAt: new Date(), updatedAt: new Date(), messages: [], + metadata, }; - const result = await this.orchestratorCollection.insertOne(chat); + + const result = await this.chatsCollection.insertOne(chat); return result.insertedId.toString(); } @@ -208,15 +236,14 @@ export class MongoDb implements OrchestratorDb { * @param name - The name/id of the IOHandler. * @param data - The data payload to store (e.g., text, JSON from APIs, etc). */ - public async addMessage( - orchestratorId: string, + public async addChatMessage( + chatId: string, role: HandlerRole, name: string, data: unknown ): Promise { - console.log("addMessage", orchestratorId, role, name, data); - await this.orchestratorCollection.updateOne( - { _id: orchestratorId }, + await this.chatsCollection.updateOne( + { _id: chatId }, { $push: { messages: { @@ -236,13 +263,11 @@ export class MongoDb implements OrchestratorDb { /** * Retrieves all messages in a specific orchestrator's conversation. */ - public async getMessages( - orchestratorId: string - ): Promise { - const doc = await this.orchestratorCollection.findOne({ - _id: orchestratorId, + public async getChatMessages(chatId: string): Promise { + const doc = await this.chatsCollection.findOne({ + _id: chatId, }); - console.log("getMessages", orchestratorId, doc); + if (!doc) return []; return doc.messages; } diff --git a/packages/core/src/core/io/discord.ts b/packages/core/src/core/io/discord.ts index 61403e66..45a92e4e 100644 --- a/packages/core/src/core/io/discord.ts +++ b/packages/core/src/core/io/discord.ts @@ -6,7 +6,12 @@ import { Partials, } from "discord.js"; import { Logger } from "../../core/logger"; -import { HandlerRole, LogLevel, type IOHandler } from "../types"; +import { + HandlerRole, + LogLevel, + type IOHandler, + type ProcessableContent, +} from "../types"; import { env } from "../../core/env"; import { z } from "zod"; @@ -75,7 +80,9 @@ export class DiscordClient { * Optionally start listening to Discord messages. * The onData callback typically feeds data into Orchestrator or similar. */ - public startMessageStream(onData: (data: any) => void) { + public startMessageStream( + onData: (data: ProcessableContent | ProcessableContent[]) => void + ) { this.logger.info("DiscordClient", "Starting message stream..."); // If you want to capture the listener reference for removal: @@ -93,10 +100,13 @@ export class DiscordClient { } onData({ - content: message.content, + userId: message.author?.displayName, + platformId: "discord", + threadId: message.channel.id, contentId: message.id, - conversationId: message.channel.id, - sentBy: message.author?.id, + data: { + content: message.content, + }, }); }; @@ -133,7 +143,10 @@ export class DiscordClient { role: HandlerRole.OUTPUT, name: "discord_message", execute: async (data: T) => { - return await this.sendMessage(data as MessageData); + // Cast the result to ProcessableContent to satisfy the IOHandler signature. + return (await this.sendMessage( + data as MessageData + )) as unknown as ProcessableContent; }, outputSchema: messageSchema, }; diff --git a/packages/core/src/core/life-cycle.ts b/packages/core/src/core/life-cycle.ts index 113f5316..666d4a53 100644 --- a/packages/core/src/core/life-cycle.ts +++ b/packages/core/src/core/life-cycle.ts @@ -1,7 +1,4 @@ -// MyFlowLifecycle.ts - -import { HandlerRole, type Memory } from "./types"; -// Suppose we have an OrchestratorDb or some DB client: +import { HandlerRole, type ChatMessage, type Memory } from "./types"; import type { OrchestratorDb, OrchestratorMessage } from "./memory"; import type { ConversationManager } from "./conversation-manager"; import type { Conversation } from "./conversation"; @@ -13,26 +10,25 @@ export function makeFlowLifecycle( return { async onFlowStart( userId: string, - sourceName: string, + platformId: string, + threadId: string, initialData: unknown ): Promise { - return await orchestratorDb.createOrchestrator(userId); + return await orchestratorDb.getOrCreateChat( + userId, + platformId, + threadId + ); }, async onFlowStep( - orchestratorId: string | undefined, - userId: string, + chatId: string | undefined, role: HandlerRole, sourceName: string, data: unknown ): Promise { - if (!orchestratorId) return; - await orchestratorDb.addMessage( - orchestratorId, - role, - sourceName, - data - ); + if (!chatId) return; + await orchestratorDb.addChatMessage(chatId, role, sourceName, data); }, async onTasksScheduled( @@ -56,14 +52,13 @@ export function makeFlowLifecycle( }, async onOutputDispatched( - orchestratorId: string | undefined, - userId: string, + chatId: string | undefined, outputName: string, outputData: unknown ): Promise { - if (!orchestratorId) return; - await orchestratorDb.addMessage( - orchestratorId, + if (!chatId) return; + await orchestratorDb.addChatMessage( + chatId, HandlerRole.OUTPUT, outputName, outputData @@ -71,15 +66,14 @@ export function makeFlowLifecycle( }, async onActionDispatched( - orchestratorId: string | undefined, - userId: string, + chatId: string | undefined, actionName: string, inputData: unknown, result: unknown ): Promise { - if (!orchestratorId) return; - await orchestratorDb.addMessage( - orchestratorId, + if (!chatId) return; + await orchestratorDb.addChatMessage( + chatId, HandlerRole.ACTION, actionName, { @@ -91,31 +85,31 @@ export function makeFlowLifecycle( async onContentProcessed( contentId: string, - conversationId: string, + threadId: string, content: string, metadata?: Record ): Promise { const hasProcessed = await conversationManager.hasProcessedContentInConversation( contentId, - conversationId + threadId ); if (hasProcessed) { return; } await conversationManager.markContentAsProcessed( contentId, - conversationId + threadId ); }, async onConversationCreated( userId: string, - conversationId: string, + threadId: string, source: string ): Promise { return await conversationManager.ensureConversation( - conversationId, + threadId, source, userId ); @@ -123,67 +117,61 @@ export function makeFlowLifecycle( async onConversationUpdated( contentId: string, - conversationId: string, + threadId: string, content: string, source: string, updates: Record ): Promise { await conversationManager.markContentAsProcessed( contentId, - conversationId + threadId ); }, async onMemoryAdded( - conversationId: string, + chatId: string, content: string, source: string, updates: Record ): Promise { - await conversationManager.addMemory(conversationId, content, { + await conversationManager.addMemory(chatId, content, { source, ...updates, }); - await orchestratorDb.addMessage( - conversationId, + await orchestratorDb.addChatMessage( + chatId, HandlerRole.INPUT, source, { content, - conversationId, + chatId, } ); }, async onMemoriesRequested( - conversationId: string, + chatId: string, limit?: number - ): Promise<{ memories: Memory[]; chatHistory: OrchestratorMessage[] }> { - console.log("onMemoriesRequested", conversationId, limit); + ): Promise<{ memories: Memory[] }> { // get vector based memories // todo, we could base this on a userID so the agent has memories across conversations const memories = await conversationManager.getMemoriesFromConversation( - conversationId, + chatId, limit ); - // TODO: get history from db - const chatHistory = - await orchestratorDb.getMessages(conversationId); - - console.log("chatHistory", memories, chatHistory); - return { memories, chatHistory }; + return { memories }; }, async onCheckContentProcessed( contentId: string, - conversationId: string + chatId: string ): Promise { return await conversationManager.hasProcessedContentInConversation( contentId, - conversationId + chatId ); }, }; @@ -200,7 +188,8 @@ export interface FlowLifecycle { */ onFlowStart( userId: string, - sourceName: string, + platformId: string, + threadId: string, initialData: unknown ): Promise; @@ -208,8 +197,7 @@ export interface FlowLifecycle { * Called when new data is processed in the flow (e.g., an input message). */ onFlowStep( - orchestratorId: string | undefined, - userId: string, + chatId: string | undefined, role: HandlerRole, sourceName: string, data: unknown @@ -232,8 +220,7 @@ export interface FlowLifecycle { * Called after an output is dispatched (e.g. store it or log it). */ onOutputDispatched( - orchestratorId: string | undefined, - userId: string, + chatId: string | undefined, outputName: string, outputData: unknown ): Promise; @@ -242,8 +229,7 @@ export interface FlowLifecycle { * Called after an action is dispatched (e.g. store it or log it). */ onActionDispatched( - orchestratorId: string | undefined, - userId: string, + chatId: string | undefined, actionName: string, inputData: unknown, result: unknown @@ -254,7 +240,7 @@ export interface FlowLifecycle { */ onContentProcessed( userId: string, - conversationId: string, + threadId: string, content: string, metadata?: Record ): Promise; @@ -264,7 +250,7 @@ export interface FlowLifecycle { */ onConversationCreated( userId: string, - conversationId: string, + threadId: string, source: string, metadata?: Record ): Promise; @@ -274,7 +260,7 @@ export interface FlowLifecycle { */ onConversationUpdated( contentId: string, - conversationId: string, + threadId: string, content: string, source: string, updates: Record @@ -284,7 +270,7 @@ export interface FlowLifecycle { * Called when a new memory needs to be added to a conversation */ onMemoryAdded( - conversationId: string, + chatId: string, content: string, source: string, updates: Record @@ -294,15 +280,15 @@ export interface FlowLifecycle { * Called when memories need to be retrieved for a conversation */ onMemoriesRequested( - conversationId: string, + chatId: string, limit?: number - ): Promise<{ memories: Memory[]; chatHistory: OrchestratorMessage[] }>; + ): Promise<{ memories: Memory[] }>; /** * Called to check if specific content has been processed in a conversation */ onCheckContentProcessed( contentId: string, - conversationId: string + chatId: string ): Promise; } diff --git a/packages/core/src/core/memory.ts b/packages/core/src/core/memory.ts index 9ed72e7d..89767fcb 100644 --- a/packages/core/src/core/memory.ts +++ b/packages/core/src/core/memory.ts @@ -29,6 +29,25 @@ export interface OrchestratorChat { messages: OrchestratorMessage[]; } +export interface Chat { + _id?: string; + userId: string; + platformId: string; // e.g., "twitter", "telegram" + threadId: string; // platform-specific thread/conversation ID + createdAt: Date; + updatedAt: Date; + messages: ChatMessage[]; + metadata?: Record; // Platform-specific data +} + +export interface ChatMessage { + role: HandlerRole; + name: string; + data: unknown; + timestamp: Date; + messageId?: string; // Platform-specific message ID if available +} + export interface OrchestratorDb { connect(): Promise; close(): Promise; @@ -36,14 +55,19 @@ export interface OrchestratorDb { // Orchestrator methods getOrchestratorById(id: string): Promise; getOrchestratorsByUserId(userId: string): Promise; - createOrchestrator(userId: string): Promise; - addMessage( - conversationId: string, + getOrCreateChat( + userId: string, + platformId: string, + threadId: string, + metadata?: Record + ): Promise; + addChatMessage( + chatId: string, role: HandlerRole, name: string, - data: any + data: unknown ): Promise; - getMessages(conversationId: string): Promise; + getChatMessages(chatId: string): Promise; // Task management methods createTask( diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index d5077d87..e5e05d42 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -32,7 +32,7 @@ export class Orchestrator { constructor( private processor: BaseProcessor, - private readonly flowLifecycle: FlowLifecycle, + private readonly flowHooks: FlowLifecycle, config?: LoggerConfig ) { this.logger = new Logger( @@ -69,17 +69,15 @@ export class Orchestrator { this.ioHandlers.set(handler.name, handler); - if (handler.role === "input" && handler.subscribe) { + if (handler.role === HandlerRole.INPUT && handler.subscribe) { const unsubscribe = handler.subscribe(async (data) => { this.logger.info( "Orchestrator.registerIOHandler", "Starting stream", { data } ); - // Simulate a request-like object here if you want a consistent approach. - const fakeRequest: AgentRequest = { headers: {} }; - // Whenever data arrives, pass it into runAutonomousFlow - await this.runAutonomousFlow(fakeRequest, data, handler.name); + + await this.runAutonomousFlow(data, handler.name); }); this.unsubscribers.set(handler.name, unsubscribe); } @@ -115,7 +113,6 @@ export class Orchestrator { */ public async dispatchToOutput( name: string, - request: AgentRequest, data: ProcessableContent ): Promise { const handler = this.ioHandlers.get(name); @@ -130,7 +127,6 @@ export class Orchestrator { this.logger.debug("Orchestrator.dispatchToOutput", "Executing output", { name, data, - headers: request.headers, }); try { @@ -159,7 +155,6 @@ export class Orchestrator { */ public async dispatchToAction( name: string, - request: AgentRequest, data: ProcessableContent ): Promise { const handler = this.ioHandlers.get(name); @@ -179,7 +174,6 @@ export class Orchestrator { { name, data, - headers: request.headers, } ); return result; @@ -202,9 +196,7 @@ export class Orchestrator { */ public async dispatchToInput( name: string, - request: AgentRequest, - data: ProcessableContent, - orchestratorId?: string + data: ProcessableContent ): Promise { const handler = this.ioHandlers.get(name); if (!handler) throw new Error(`No IOHandler: ${name}`); @@ -220,12 +212,7 @@ export class Orchestrator { const result = await handler.execute(data); if (result) { - return await this.runAutonomousFlow( - request, - result, - handler.name, - orchestratorId - ); + return await this.runAutonomousFlow(result, handler.name); } return []; } catch (error) { @@ -243,61 +230,44 @@ export class Orchestrator { * Processes incoming data through the system and manages any follow-on * "action" or "output" suggestions in a queue. * - * @param request A request-like object containing headers and other metadata - * @param initialData The data payload to be processed + * @param data The data payload to be processed * @param sourceName The name of the IOHandler that provided this data - * @param orchestratorId An optional existing orchestrator record ID to associate with */ private async runAutonomousFlow( - request: AgentRequest, - initialData: unknown, - sourceName: string, - orchestratorId?: string + data: ProcessableContent | ProcessableContent[], + sourceName: string ): Promise> { - const userId = request.headers["x-user-id"] ?? "agent"; - // Prepare the processing queue - const queue: Array<{ data: unknown; source: string }> = Array.isArray( - initialData - ) - ? initialData.map((item) => ({ data: item, source: sourceName })) - : [{ data: initialData, source: sourceName }]; + const queue: Array<{ data: ProcessableContent; source: string }> = + Array.isArray(data) + ? data.map((item) => ({ data: item, source: sourceName })) + : [{ data: data, source: sourceName }]; // Initialize an array to collect outputs const outputs: Array<{ name: string; data: any }> = []; - // Trigger onFlowStart, possibly updating orchestratorId - if (this.flowLifecycle?.onFlowStart) { - const flowId = await this.flowLifecycle.onFlowStart( - userId, - sourceName, - initialData - ); - if (flowId) { - orchestratorId = flowId; - } - } - // Process the queue until empty while (queue.length > 0) { const { data, source } = queue.shift()!; - // Notify of an incoming step + // Starts the chat + const chatId = await this.flowHooks.onFlowStart( + data.userId, + data.platformId, + data.threadId, + data.data + ); - await this.flowLifecycle.onFlowStep( - orchestratorId, - userId, + // process the input as a step - recording the input + await this.flowHooks.onFlowStep( + chatId, HandlerRole.INPUT, source, data ); // Main content processing - const processedResults = await this.processContent( - data as ProcessableContent | ProcessableContent[], - source, - userId - ); + const processedResults = await this.processContent(data, source); if (!processedResults?.length) { continue; @@ -311,8 +281,8 @@ export class Orchestrator { // Schedule tasks if present if (result.updateTasks?.length) { - await this.flowLifecycle.onTasksScheduled( - userId, + await this.flowHooks.onTasksScheduled( + data.userId, result.updateTasks.map((task) => ({ name: task.name, data: task.data, @@ -340,58 +310,46 @@ export class Orchestrator { name: output.name, data: output.data, }); + await this.dispatchToOutput( output.name, - request, output.data ); - if (this.flowLifecycle?.onFlowStep) { - await this.flowLifecycle.onFlowStep( - orchestratorId, - userId, - HandlerRole.OUTPUT, - output.name, - output.data - ); - } + await this.flowHooks.onFlowStep( + chatId, + HandlerRole.OUTPUT, + output.name, + output.data + ); + + await this.flowHooks.onOutputDispatched( + chatId, + output.name, + output.data + ); - if (this.flowLifecycle?.onOutputDispatched) { - await this.flowLifecycle.onOutputDispatched( - orchestratorId, - userId, - output.name, - output.data - ); - } break; case HandlerRole.ACTION: const actionResult = await this.dispatchToAction( output.name, - request, output.data ); - if ( - this.flowLifecycle?.onFlowStep && - this.flowLifecycle?.onActionDispatched - ) { - await this.flowLifecycle.onFlowStep( - orchestratorId, - userId, - HandlerRole.ACTION, - output.name, - { input: output.data, result: actionResult } - ); - await this.flowLifecycle.onActionDispatched( - orchestratorId, - userId, - output.name, - output.data, - actionResult - ); - } + await this.flowHooks.onFlowStep( + chatId, + HandlerRole.ACTION, + output.name, + { input: output.data, result: actionResult } + ); + + await this.flowHooks.onActionDispatched( + chatId, + output.name, + output.data, + actionResult + ); // Queue any new data returned from the action if (actionResult) { @@ -428,19 +386,14 @@ export class Orchestrator { */ public async processContent( content: ProcessableContent | ProcessableContent[], - source: string, - userId: string + source: string ): Promise { if (Array.isArray(content)) { const allResults: ProcessedResult[] = []; for (const item of content) { // Example delay: remove if not needed await new Promise((resolve) => setTimeout(resolve, 5000)); - const result = await this.processContentItem( - item, - source, - userId - ); + const result = await this.processContentItem(item, source); if (result) { allResults.push(result); } @@ -448,11 +401,7 @@ export class Orchestrator { return allResults; } - const singleResult = await this.processContentItem( - content, - source, - userId - ); + const singleResult = await this.processContentItem(content, source); return singleResult ? [singleResult] : []; } @@ -464,41 +413,21 @@ export class Orchestrator { */ private async processContentItem( content: ProcessableContent, - source: string, - userId: string + source: string ): Promise { let memories: { memories: Memory[]; - chatHistory: OrchestratorMessage[]; - } = { memories: [], chatHistory: [] }; - - if (content.conversationId && content.contentId) { - if ( - await this.flowLifecycle.onCheckContentProcessed( - content.contentId, - content.conversationId - ) - ) { - this.logger.debug( - "Orchestrator.processContentItem", - "Content already processed", - { - contentId: content.contentId, - conversationId: content.conversationId, - userId, - } - ); - return null; - } + } = { memories: [] }; - // Ensure the conversation - const conversation = await this.flowLifecycle.onConversationCreated( - userId, - content.conversationId, - source - ); + const conversation = await this.flowHooks.onConversationCreated( + content.userId, + content.threadId, + source + ); - memories = await this.flowLifecycle.onMemoriesRequested( + if (content.threadId && content.userId) { + // Ensure the conversation exists + memories = await this.flowHooks.onMemoriesRequested( conversation.id ); @@ -509,7 +438,7 @@ export class Orchestrator { content, source, conversationId: conversation.id, - userId, + userId: content.userId, relevantMemories: memories, } ); @@ -533,37 +462,37 @@ export class Orchestrator { } ); - // If there's a conversationId, store the memory and mark processed - if (content.conversationId && content.contentId) { - await this.flowLifecycle.onMemoryAdded( - content.conversationId, - JSON.stringify(result.content), - source, - { - ...result.metadata, - ...result.enrichedContext, - } - ); + // Save the memory + await this.flowHooks.onMemoryAdded( + conversation.id, + JSON.stringify(result.content), + source, + { + ...result.metadata, + ...result.enrichedContext, + } + ); - this.logger.debug( - "Orchestrator.processContentItem", - "Updating conversation", - { - contentId: content.contentId, - conversationId: content.conversationId, - userId, - result, - } - ); + this.logger.debug( + "Orchestrator.processContentItem", + "Updating conversation", + { + conversationId: conversation.id, + contentId: content.contentId, + threadId: content.threadId, + userId: content.userId, + result, + } + ); - await this.flowLifecycle.onConversationUpdated( - content.contentId, - content.conversationId, - JSON.stringify(result.content), - source, - result.metadata - ); - } + // Update the conversation + await this.flowHooks.onConversationUpdated( + content.contentId, + conversation.id, + JSON.stringify(result.content), + source, + result.metadata + ); return result; } diff --git a/packages/core/src/core/types/index.ts b/packages/core/src/core/types/index.ts index ee185524..4029e6a8 100644 --- a/packages/core/src/core/types/index.ts +++ b/packages/core/src/core/types/index.ts @@ -554,7 +554,9 @@ export interface InputIOHandler extends BaseIOHandler { /** Function to process input data */ execute?: (data: any) => Promise; /** Sets up a subscription to receive streaming data */ - subscribe?: (onData: (data: any) => void) => () => void; + subscribe?: ( + onData: (data: ProcessableContent | ProcessableContent[]) => void + ) => () => void; } /** @@ -580,7 +582,7 @@ export interface OutputIOHandler extends BaseIOHandler { /** Required schema to validate output data */ outputSchema: z.ZodType; /** Function to process and send output */ - execute?: (data: any) => Promise; + execute?: (data: any) => any; /** Sets up a subscription to handle output streams */ subscribe?: (onData: (data: any) => void) => () => void; } @@ -619,7 +621,28 @@ export interface AgentRequest { * Base interface for any content that can be processed */ export interface ProcessableContent { - conversationId?: string; - contentId?: string; - [key: string]: any; // Allow additional properties + contentId: string; + userId: string; + platformId: string; + threadId: string; + data: unknown; +} + +export interface Chat { + _id?: string; + userId: string; // the user the agent is interacting with could be an agent or a human + platformId: string; // e.g., "twitter", "telegram" + threadId: string; // platform-specific thread/conversation ID + createdAt: Date; + updatedAt: Date; + messages: ChatMessage[]; + metadata?: Record; // Platform-specific data +} + +export interface ChatMessage { + role: HandlerRole; + name: string; + data: unknown; + timestamp: Date; + messageId?: string; // Platform-specific message ID if available } From 5a87cf9d9e1de4bdeb9e3265ec8af7104dcc5f00 Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Sat, 1 Feb 2025 22:41:29 +1100 Subject: [PATCH 07/11] build error --- packages/core/src/core/schedule-service.ts | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/packages/core/src/core/schedule-service.ts b/packages/core/src/core/schedule-service.ts index 7fdb9a31..f150da7f 100644 --- a/packages/core/src/core/schedule-service.ts +++ b/packages/core/src/core/schedule-service.ts @@ -52,33 +52,20 @@ export class SchedulerService { case HandlerRole.INPUT: await this.orchestrator.dispatchToInput( task.handlerName, - { - headers: { - "x-user-id": task.userId, - }, - }, + data ); break; case HandlerRole.ACTION: await this.orchestrator.dispatchToAction( task.handlerName, - { - headers: { - "x-user-id": task.userId, - }, - }, + data ); break; case HandlerRole.OUTPUT: await this.orchestrator.dispatchToOutput( task.handlerName, - { - headers: { - "x-user-id": task.userId, - }, - }, data ); break; From 8d2f3e3516b8ca15cd7e6f0b8f1733642cfe4e49 Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Sat, 1 Feb 2025 22:47:09 +1100 Subject: [PATCH 08/11] update --- packages/core/src/core/life-cycle.ts | 4 ++-- packages/core/src/core/orchestrator.ts | 9 +-------- packages/core/src/core/types/index.ts | 4 ---- 3 files changed, 3 insertions(+), 14 deletions(-) diff --git a/packages/core/src/core/life-cycle.ts b/packages/core/src/core/life-cycle.ts index 666d4a53..29d0f222 100644 --- a/packages/core/src/core/life-cycle.ts +++ b/packages/core/src/core/life-cycle.ts @@ -1,5 +1,5 @@ -import { HandlerRole, type ChatMessage, type Memory } from "./types"; -import type { OrchestratorDb, OrchestratorMessage } from "./memory"; +import { HandlerRole, type Memory } from "./types"; +import type { OrchestratorDb } from "./memory"; import type { ConversationManager } from "./conversation-manager"; import type { Conversation } from "./conversation"; diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index e5e05d42..d22314fa 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -1,16 +1,9 @@ import { Logger } from "./logger"; import type { BaseProcessor } from "./processor"; -import type { - AgentRequest, - Memory, - ProcessableContent, - ProcessedResult, -} from "./types"; +import type { Memory, ProcessableContent, ProcessedResult } from "./types"; import { HandlerRole, LogLevel, type LoggerConfig } from "./types"; import type { IOHandler } from "./types"; - import type { FlowLifecycle } from "./life-cycle"; -import type { OrchestratorMessage } from "./memory"; export class Orchestrator { /** diff --git a/packages/core/src/core/types/index.ts b/packages/core/src/core/types/index.ts index 4029e6a8..8cd7137f 100644 --- a/packages/core/src/core/types/index.ts +++ b/packages/core/src/core/types/index.ts @@ -613,10 +613,6 @@ export interface ActionIOHandler extends BaseIOHandler { /** Union type of all possible IO handler types */ export type IOHandler = InputIOHandler | OutputIOHandler | ActionIOHandler; -export interface AgentRequest { - headers: Record; -} - /** * Base interface for any content that can be processed */ From 78bb7ce4fc0db8fb940fe6042826418609378c9b Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Sun, 2 Feb 2025 10:05:27 +1100 Subject: [PATCH 09/11] hooks --- packages/core/src/core/life-cycle.ts | 63 ++----------------- packages/core/src/core/orchestrator.ts | 24 ++----- packages/core/src/core/processor.ts | 4 +- .../src/core/processors/master-processor.ts | 3 +- 4 files changed, 13 insertions(+), 81 deletions(-) diff --git a/packages/core/src/core/life-cycle.ts b/packages/core/src/core/life-cycle.ts index 29d0f222..0c4e6b81 100644 --- a/packages/core/src/core/life-cycle.ts +++ b/packages/core/src/core/life-cycle.ts @@ -13,12 +13,8 @@ export function makeFlowLifecycle( platformId: string, threadId: string, initialData: unknown - ): Promise { - return await orchestratorDb.getOrCreateChat( - userId, - platformId, - threadId - ); + ): Promise { + return orchestratorDb.getOrCreateChat(userId, platformId, threadId); }, async onFlowStep( @@ -51,38 +47,6 @@ export function makeFlowLifecycle( } }, - async onOutputDispatched( - chatId: string | undefined, - outputName: string, - outputData: unknown - ): Promise { - if (!chatId) return; - await orchestratorDb.addChatMessage( - chatId, - HandlerRole.OUTPUT, - outputName, - outputData - ); - }, - - async onActionDispatched( - chatId: string | undefined, - actionName: string, - inputData: unknown, - result: unknown - ): Promise { - if (!chatId) return; - await orchestratorDb.addChatMessage( - chatId, - HandlerRole.ACTION, - actionName, - { - input: inputData, - result, - } - ); - }, - async onContentProcessed( contentId: string, threadId: string, @@ -191,13 +155,13 @@ export interface FlowLifecycle { platformId: string, threadId: string, initialData: unknown - ): Promise; + ): Promise; /** * Called when new data is processed in the flow (e.g., an input message). */ onFlowStep( - chatId: string | undefined, + chatId: string, role: HandlerRole, sourceName: string, data: unknown @@ -216,25 +180,6 @@ export interface FlowLifecycle { }[] ): Promise; - /** - * Called after an output is dispatched (e.g. store it or log it). - */ - onOutputDispatched( - chatId: string | undefined, - outputName: string, - outputData: unknown - ): Promise; - - /** - * Called after an action is dispatched (e.g. store it or log it). - */ - onActionDispatched( - chatId: string | undefined, - actionName: string, - inputData: unknown, - result: unknown - ): Promise; - /** * Called when content has been processed in a conversation */ diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index d22314fa..ccab6eb9 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -70,7 +70,7 @@ export class Orchestrator { { data } ); - await this.runAutonomousFlow(data, handler.name); + await this.run(data, handler.name); }); this.unsubscribers.set(handler.name, unsubscribe); } @@ -205,7 +205,7 @@ export class Orchestrator { const result = await handler.execute(data); if (result) { - return await this.runAutonomousFlow(result, handler.name); + return await this.run(result, handler.name); } return []; } catch (error) { @@ -226,7 +226,7 @@ export class Orchestrator { * @param data The data payload to be processed * @param sourceName The name of the IOHandler that provided this data */ - private async runAutonomousFlow( + private async run( data: ProcessableContent | ProcessableContent[], sourceName: string ): Promise> { @@ -251,7 +251,6 @@ export class Orchestrator { data.data ); - // process the input as a step - recording the input await this.flowHooks.onFlowStep( chatId, HandlerRole.INPUT, @@ -290,7 +289,7 @@ export class Orchestrator { if (!handler) { this.logger.warn( - "Orchestrator.runAutonomousFlow", + "Orchestrator.run", `No handler found for suggested output: ${output.name}` ); continue; @@ -316,12 +315,6 @@ export class Orchestrator { output.data ); - await this.flowHooks.onOutputDispatched( - chatId, - output.name, - output.data - ); - break; case HandlerRole.ACTION: @@ -337,13 +330,6 @@ export class Orchestrator { { input: output.data, result: actionResult } ); - await this.flowHooks.onActionDispatched( - chatId, - output.name, - output.data, - actionResult - ); - // Queue any new data returned from the action if (actionResult) { const newItems = Array.isArray(actionResult) @@ -360,7 +346,7 @@ export class Orchestrator { default: this.logger.warn( - "Orchestrator.runAutonomousFlow", + "Orchestrator.run", "Suggested output has an unrecognized role", handler.role ); diff --git a/packages/core/src/core/processor.ts b/packages/core/src/core/processor.ts index 1e2491b4..70eee6ba 100644 --- a/packages/core/src/core/processor.ts +++ b/packages/core/src/core/processor.ts @@ -2,7 +2,7 @@ import { Logger } from "./logger"; import { LogLevel, type Character, type ProcessedResult } from "./types"; -import type { IOHandler } from "./types"; +import type { IOHandler, ProcessableContent } from "./types"; export abstract class BaseProcessor { /** Logger instance for this processor */ @@ -47,7 +47,7 @@ export abstract class BaseProcessor { * Processes the given content and returns a result. */ public abstract process( - content: any, + content: ProcessableContent, otherContext: string, ioContext?: { availableOutputs?: IOHandler[]; diff --git a/packages/core/src/core/processors/master-processor.ts b/packages/core/src/core/processors/master-processor.ts index 7cf122de..5f7652b9 100644 --- a/packages/core/src/core/processors/master-processor.ts +++ b/packages/core/src/core/processors/master-processor.ts @@ -4,6 +4,7 @@ import type { ActionIOHandler, Character, OutputIOHandler, + ProcessableContent, ProcessedResult, SuggestedOutput, } from "../types"; @@ -46,7 +47,7 @@ export class MasterProcessor extends BaseProcessor { } async process( - content: any, + content: ProcessableContent, otherContext: string, ioContext?: { availableOutputs: OutputIOHandler[]; From 8a6b421f7caece9b7208c0159a7a2d032b2be0f2 Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Sun, 2 Feb 2025 10:07:31 +1100 Subject: [PATCH 10/11] hooks --- examples/example-discord.ts | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/examples/example-discord.ts b/examples/example-discord.ts index c2efe121..b5aa3f46 100644 --- a/examples/example-discord.ts +++ b/examples/example-discord.ts @@ -14,10 +14,8 @@ import { LLMClient } from "../packages/core/src/core/llm-client"; import { env } from "../packages/core/src/core/env"; import chalk from "chalk"; import { defaultCharacter } from "../packages/core/src/core/character"; -import { z } from "zod"; import readline from "readline"; import { MongoDb } from "../packages/core/src/core/db/mongo-db"; -import { Message } from "discord.js"; import { MasterProcessor } from "../packages/core/src/core/processors/master-processor"; import { makeFlowLifecycle } from "../packages/core/src/core/life-cycle"; @@ -52,21 +50,21 @@ async function main() { ); // Connect to MongoDB (for scheduled tasks, if you use them) - const scheduledTaskDb = new MongoDb( + const KVDB = new MongoDb( "mongodb://localhost:27017", "myApp", "scheduled_tasks" ); - await scheduledTaskDb.connect(); + await KVDB.connect(); console.log(chalk.green("✅ Scheduled task database connected")); // Clear any existing tasks if you like - await scheduledTaskDb.deleteAll(); + await KVDB.deleteAll(); // Create the Orchestrator const core = new Orchestrator( masterProcessor, - makeFlowLifecycle(scheduledTaskDb, conversationManager), + makeFlowLifecycle(KVDB, conversationManager), { level: loglevel, enableColors: true, From 199bf7df0fcdb3004882fa9e6faa99dbabcac734 Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Sun, 2 Feb 2025 10:18:36 +1100 Subject: [PATCH 11/11] cleanup --- packages/core/src/core/db/mongo-db.ts | 8 +- packages/core/src/core/life-cycle.ts | 3 +- packages/core/src/core/memory.ts | 55 +---- packages/core/src/core/orchestrator.ts | 307 ++++++++++++------------- packages/core/src/core/types/index.ts | 47 ++++ packages/core/src/core/vector-db.ts | 1 - 6 files changed, 211 insertions(+), 210 deletions(-) diff --git a/packages/core/src/core/db/mongo-db.ts b/packages/core/src/core/db/mongo-db.ts index f70fbc1f..056a6901 100644 --- a/packages/core/src/core/db/mongo-db.ts +++ b/packages/core/src/core/db/mongo-db.ts @@ -1,10 +1,12 @@ import { MongoClient, Collection, ObjectId } from "mongodb"; -import type { Chat, ChatMessage, HandlerRole } from "../types"; import type { + Chat, + ChatMessage, + HandlerRole, OrchestratorChat, - OrchestratorDb, ScheduledTask, -} from "../memory"; +} from "../types"; +import type { OrchestratorDb } from "../memory"; export class MongoDb implements OrchestratorDb { private client: MongoClient; diff --git a/packages/core/src/core/life-cycle.ts b/packages/core/src/core/life-cycle.ts index 0c4e6b81..d7503e80 100644 --- a/packages/core/src/core/life-cycle.ts +++ b/packages/core/src/core/life-cycle.ts @@ -142,8 +142,7 @@ export function makeFlowLifecycle( } /** - * A set of optional lifecycle callbacks for orchestrator events. - * The Orchestrator will call these methods if they are defined. + * A set of lifecycle callbacks for orchestrator events. */ export interface FlowLifecycle { /** diff --git a/packages/core/src/core/memory.ts b/packages/core/src/core/memory.ts index 89767fcb..7b1ce65d 100644 --- a/packages/core/src/core/memory.ts +++ b/packages/core/src/core/memory.ts @@ -1,53 +1,12 @@ -import type { HandlerRole, Memory } from "./types"; +import type { + ChatMessage, + HandlerRole, + Memory, + OrchestratorChat, + ScheduledTask, +} from "./types"; import type { Conversation } from "./conversation"; -// Define interfaces matching MongoDB document shapes -export interface ScheduledTask { - _id: string; - userId: string; - handlerName: string; - taskData: Record; - nextRunAt: Date; - intervalMs?: number; - status: "pending" | "running" | "completed" | "failed"; - createdAt: Date; - updatedAt: Date; -} - -export interface OrchestratorMessage { - role: HandlerRole; - name: string; - data: unknown; - timestamp: Date; -} - -export interface OrchestratorChat { - _id?: string; - userId: string; - createdAt: Date; - updatedAt: Date; - messages: OrchestratorMessage[]; -} - -export interface Chat { - _id?: string; - userId: string; - platformId: string; // e.g., "twitter", "telegram" - threadId: string; // platform-specific thread/conversation ID - createdAt: Date; - updatedAt: Date; - messages: ChatMessage[]; - metadata?: Record; // Platform-specific data -} - -export interface ChatMessage { - role: HandlerRole; - name: string; - data: unknown; - timestamp: Date; - messageId?: string; // Platform-specific message ID if available -} - export interface OrchestratorDb { connect(): Promise; close(): Promise; diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index ccab6eb9..146323f3 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -7,8 +7,7 @@ import type { FlowLifecycle } from "./life-cycle"; export class Orchestrator { /** - * Unified collection of IOHandlers (both input & output). - * Keyed by .name + * Unified collection of IOHandlers (both input & output), keyed by name. */ private readonly ioHandlers = new Map(); @@ -18,8 +17,7 @@ export class Orchestrator { private readonly logger: Logger; /** - * Map of unsubscribe functions for various handlers. - * Keyed by handler name. + * Map of unsubscribe functions for various handlers, keyed by handler name. */ private unsubscribers = new Map void>(); @@ -47,9 +45,8 @@ export class Orchestrator { } /** - * Primary method to register any IOHandler (input or output). - * - If it's an input with an interval, schedule it for recurring runs. - * - Otherwise, just store it in the ioHandlers map. + * Registers an IOHandler (input or output). For input handlers with a subscribe method, + * registers the subscription and stores its unsubscriber. */ public registerIOHandler(handler: IOHandler): void { if (this.ioHandlers.has(handler.name)) { @@ -69,7 +66,6 @@ export class Orchestrator { "Starting stream", { data } ); - await this.run(data, handler.name); }); this.unsubscribers.set(handler.name, unsubscribe); @@ -83,17 +79,15 @@ export class Orchestrator { } /** - * Removes a handler (input or output) by name, stopping scheduling if needed. + * Removes a handler (input or output) by name and stops its scheduling if needed. */ public removeIOHandler(name: string): void { - // If we have an unsubscribe function, call it const unsub = this.unsubscribers.get(name); if (unsub) { - unsub(); // e.g. remove event listeners, clear intervals, etc. + unsub(); // E.g. remove event listeners, clear intervals, etc. this.unsubscribers.delete(name); } - // Remove the handler itself this.ioHandlers.delete(name); this.logger.info("Orchestrator.removeIOHandler", "Removed IOHandler", { @@ -102,7 +96,7 @@ export class Orchestrator { } /** - * Dispatches data to a registered *output* handler by name, passing in a request plus data. + * Dispatches data to a registered *output* handler by name. */ public async dispatchToOutput( name: string, @@ -112,8 +106,7 @@ export class Orchestrator { if (!handler || !handler.execute) { throw new Error(`No IOHandler registered with name: ${name}`); } - - if (handler.role !== "output") { + if (handler.role !== HandlerRole.OUTPUT) { throw new Error(`Handler "${name}" is not an output handler`); } @@ -124,27 +117,22 @@ export class Orchestrator { try { const result = await handler.execute(data); - this.logger.info("Orchestrator.dispatchToOutput", "Output result", { result, }); - return result; } catch (error) { this.logger.error( "Orchestrator.dispatchToOutput", "Handler threw an error", - { - name, - error, - } + { name, error } ); throw error; } } /** - * Dispatches data to a registered *action* handler by name, passing in a request plus data. + * Dispatches data to a registered *action* handler by name. */ public async dispatchToAction( name: string, @@ -154,13 +142,12 @@ export class Orchestrator { if (!handler || !handler.execute) { throw new Error(`No IOHandler registered with name: ${name}`); } - if (handler.role !== "action") { + if (handler.role !== HandlerRole.ACTION) { throw new Error(`Handler "${name}" is not an action handler`); } try { const result = await handler.execute(data); - this.logger.debug( "Orchestrator.dispatchToAction", "Executing action", @@ -174,36 +161,32 @@ export class Orchestrator { this.logger.error( "Orchestrator.dispatchToAction", "Handler threw an error", - { - name, - error, - } + { name, error } ); throw error; } } /** - * Dispatches data to a registered *input* handler by name, passing in a request plus data. - * Then continues through the autonomous flow. + * Dispatches data to a registered *input* handler by name, then continues through the autonomous flow. */ public async dispatchToInput( name: string, data: ProcessableContent ): Promise { const handler = this.ioHandlers.get(name); - if (!handler) throw new Error(`No IOHandler: ${name}`); + if (!handler) { + throw new Error(`No IOHandler: ${name}`); + } if (!handler.execute) { throw new Error(`Handler "${name}" has no execute method`); } - if (handler.role !== "input") { + if (handler.role !== HandlerRole.INPUT) { throw new Error(`Handler "${name}" is not role=input`); } try { - // Possibly run a transformation or normalizing step inside `handler.execute` const result = await handler.execute(data); - if (result) { return await this.run(result, handler.name); } @@ -220,148 +203,158 @@ export class Orchestrator { } /** - * Processes incoming data through the system and manages any follow-on - * "action" or "output" suggestions in a queue. + * Main processing loop: feeds incoming data into the processing queue and + * dispatches any suggested outputs or actions. * - * @param data The data payload to be processed - * @param sourceName The name of the IOHandler that provided this data + * @param data The initial data or array of data to process. + * @param sourceName The name of the IOHandler that provided this data. */ private async run( data: ProcessableContent | ProcessableContent[], sourceName: string ): Promise> { - // Prepare the processing queue + // Initialize the processing queue const queue: Array<{ data: ProcessableContent; source: string }> = Array.isArray(data) ? data.map((item) => ({ data: item, source: sourceName })) - : [{ data: data, source: sourceName }]; + : [{ data, source: sourceName }]; - // Initialize an array to collect outputs - const outputs: Array<{ name: string; data: any }> = []; + const collectedOutputs: Array<{ name: string; data: any }> = []; - // Process the queue until empty while (queue.length > 0) { - const { data, source } = queue.shift()!; - - // Starts the chat - const chatId = await this.flowHooks.onFlowStart( - data.userId, - data.platformId, - data.threadId, - data.data - ); + const currentItem = queue.shift()!; + const outputs = await this.processQueueItem(currentItem, queue); + collectedOutputs.push(...outputs); + } - await this.flowHooks.onFlowStep( - chatId, - HandlerRole.INPUT, - source, - data - ); + return collectedOutputs; + } + + /** + * Processes one queue item: + * - Starts a conversation flow. + * - Processes the content. + * - Dispatches any suggested outputs or actions. + * + * @param item The queue item containing the data and its source. + * @param queue The current processing queue (to which new items may be added). + */ + private async processQueueItem( + item: { data: ProcessableContent; source: string }, + queue: Array<{ data: ProcessableContent; source: string }> + ): Promise> { + const { data, source } = item; + const outputs: Array<{ name: string; data: any }> = []; - // Main content processing - const processedResults = await this.processContent(data, source); + // Start the conversation/flow. + const chatId = await this.flowHooks.onFlowStart( + data.userId, + data.platformId, + data.threadId, + data.data + ); + + await this.flowHooks.onFlowStep( + chatId, + HandlerRole.INPUT, + source, + data + ); + + // Process the content. + const processedResults = await this.processContent(data, source); + if (!processedResults?.length) { + return outputs; + } - if (!processedResults?.length) { + // Handle each processed result. + for (const result of processedResults) { + if (result.alreadyProcessed) { continue; } - // Handle each processed result - for (const result of processedResults) { - if (result.alreadyProcessed) { - continue; - } + // Schedule any tasks if present. + if (result.updateTasks?.length) { + await this.flowHooks.onTasksScheduled( + data.userId, + result.updateTasks.map((task) => ({ + name: task.name, + data: task.data, + intervalMs: task.intervalMs, + })) + ); + } - // Schedule tasks if present - if (result.updateTasks?.length) { - await this.flowHooks.onTasksScheduled( - data.userId, - result.updateTasks.map((task) => ({ - name: task.name, - data: task.data, - intervalMs: task.intervalMs, - })) + // Process any suggested outputs or actions. + for (const suggestion of result.suggestedOutputs ?? []) { + const handler = this.ioHandlers.get(suggestion.name); + if (!handler) { + this.logger.warn( + "Orchestrator.processQueueItem", + `No handler found for suggested output: ${suggestion.name}` ); + continue; } - // Dispatch suggested outputs or actions - for (const output of result.suggestedOutputs ?? []) { - const handler = this.ioHandlers.get(output.name); - - if (!handler) { - this.logger.warn( - "Orchestrator.run", - `No handler found for suggested output: ${output.name}` + switch (handler.role) { + case HandlerRole.OUTPUT: + outputs.push({ + name: suggestion.name, + data: suggestion.data, + }); + await this.dispatchToOutput( + suggestion.name, + suggestion.data ); - continue; - } + await this.flowHooks.onFlowStep( + chatId, + HandlerRole.OUTPUT, + suggestion.name, + suggestion.data + ); + break; - // Depending on the handler role, dispatch appropriately - switch (handler.role) { - case HandlerRole.OUTPUT: - outputs.push({ - name: output.name, - data: output.data, - }); - - await this.dispatchToOutput( - output.name, - output.data - ); - - await this.flowHooks.onFlowStep( - chatId, - HandlerRole.OUTPUT, - output.name, - output.data - ); - - break; - - case HandlerRole.ACTION: - const actionResult = await this.dispatchToAction( - output.name, - output.data - ); - - await this.flowHooks.onFlowStep( - chatId, - HandlerRole.ACTION, - output.name, - { input: output.data, result: actionResult } - ); - - // Queue any new data returned from the action - if (actionResult) { - const newItems = Array.isArray(actionResult) - ? actionResult - : [actionResult]; - for (const item of newItems) { - queue.push({ - data: item, - source: output.name, - }); - } + case HandlerRole.ACTION: { + const actionResult = await this.dispatchToAction( + suggestion.name, + suggestion.data + ); + await this.flowHooks.onFlowStep( + chatId, + HandlerRole.ACTION, + suggestion.name, + { input: suggestion.data, result: actionResult } + ); + if (actionResult) { + const newItems = Array.isArray(actionResult) + ? actionResult + : [actionResult]; + for (const newItem of newItems) { + queue.push({ + data: newItem, + source: suggestion.name, + }); } - break; - - default: - this.logger.warn( - "Orchestrator.run", - "Suggested output has an unrecognized role", - handler.role - ); + } + break; } + + default: + this.logger.warn( + "Orchestrator.processQueueItem", + "Suggested output has an unrecognized role", + handler.role + ); } } } - // Return all collected outputs return outputs; } /** - * Processes *any* content by splitting it into items (if needed) and - * calling the single-item processor. + * Processes content by handling both single items and arrays. + * A small delay is introduced for each item (if processing an array). */ public async processContent( content: ProcessableContent | ProcessableContent[], @@ -370,8 +363,7 @@ export class Orchestrator { if (Array.isArray(content)) { const allResults: ProcessedResult[] = []; for (const item of content) { - // Example delay: remove if not needed - await new Promise((resolve) => setTimeout(resolve, 5000)); + await this.delay(5000); // Example delay; remove if not needed. const result = await this.processContentItem(item, source); if (result) { allResults.push(result); @@ -385,18 +377,16 @@ export class Orchestrator { } /** - * Process a single item: - * - Retrieves prior memories from its conversation - * - Passes it to the main (or "master") processor - * - Saves result to memory & marks processed if relevant + * Processes a single content item: + * - Retrieves conversation context and prior memories. + * - Passes the item to the processor. + * - Updates conversation and memory. */ private async processContentItem( content: ProcessableContent, source: string ): Promise { - let memories: { - memories: Memory[]; - } = { memories: [] }; + let memories: { memories: Memory[] } = { memories: [] }; const conversation = await this.flowHooks.onConversationCreated( content.userId, @@ -405,7 +395,6 @@ export class Orchestrator { ); if (content.threadId && content.userId) { - // Ensure the conversation exists memories = await this.flowHooks.onMemoriesRequested( conversation.id ); @@ -423,7 +412,7 @@ export class Orchestrator { ); } - // Gather possible outputs & actions + // Collect available outputs and actions. const availableOutputs = Array.from(this.ioHandlers.values()).filter( (h) => h.role === HandlerRole.OUTPUT ); @@ -431,7 +420,7 @@ export class Orchestrator { (h) => h.role === HandlerRole.ACTION ); - // Processor main entry point + // Process the content. const result = await this.processor.process( content, JSON.stringify(memories), @@ -441,7 +430,7 @@ export class Orchestrator { } ); - // Save the memory + // Save memory and update the conversation. await this.flowHooks.onMemoryAdded( conversation.id, JSON.stringify(result.content), @@ -464,7 +453,6 @@ export class Orchestrator { } ); - // Update the conversation await this.flowHooks.onConversationUpdated( content.contentId, conversation.id, @@ -475,4 +463,11 @@ export class Orchestrator { return result; } + + /** + * A helper method to introduce a delay. + */ + private delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } } diff --git a/packages/core/src/core/types/index.ts b/packages/core/src/core/types/index.ts index 8cd7137f..de95c34e 100644 --- a/packages/core/src/core/types/index.ts +++ b/packages/core/src/core/types/index.ts @@ -642,3 +642,50 @@ export interface ChatMessage { timestamp: Date; messageId?: string; // Platform-specific message ID if available } + +// Define interfaces matching MongoDB document shapes +export interface ScheduledTask { + _id: string; + userId: string; + handlerName: string; + taskData: Record; + nextRunAt: Date; + intervalMs?: number; + status: "pending" | "running" | "completed" | "failed"; + createdAt: Date; + updatedAt: Date; +} + +export interface OrchestratorMessage { + role: HandlerRole; + name: string; + data: unknown; + timestamp: Date; +} + +export interface OrchestratorChat { + _id?: string; + userId: string; + createdAt: Date; + updatedAt: Date; + messages: OrchestratorMessage[]; +} + +export interface Chat { + _id?: string; + userId: string; + platformId: string; // e.g., "twitter", "telegram" + threadId: string; // platform-specific thread/conversation ID + createdAt: Date; + updatedAt: Date; + messages: ChatMessage[]; + metadata?: Record; +} + +export interface ChatMessage { + role: HandlerRole; + name: string; + data: unknown; + timestamp: Date; + messageId?: string; // Platform-specific message ID if available +} diff --git a/packages/core/src/core/vector-db.ts b/packages/core/src/core/vector-db.ts index 6cf74c70..1146634d 100644 --- a/packages/core/src/core/vector-db.ts +++ b/packages/core/src/core/vector-db.ts @@ -18,7 +18,6 @@ import { type VectorDB, } from "./types"; import { isValidDateValue } from "./utils"; -import { CharacterTextSplitter } from "@langchain/textsplitters"; export class ChromaVectorDB implements VectorDB { // Static collection names