Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ponderingdemocritus committed Jan 31, 2025
1 parent 2292723 commit 8b6cfb0
Showing 1 changed file with 94 additions and 77 deletions.
171 changes: 94 additions & 77 deletions packages/core/src/core/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,35 +253,36 @@ export class Orchestrator {
initialData: unknown,
sourceName: string,
orchestratorId?: string
) {
const userId = request.headers["x-user-id"] || "agent";
): Promise<Array<{ name: string; data: any }>> {
const userId = request.headers["x-user-id"] ?? "agent";

// Initialize the processing queue with the initial data
// Prepare the processing queue
const queue: Array<{ data: unknown; source: string }> = Array.isArray(
initialData
)
? initialData.map((item) => ({ data: item, source: sourceName }))
: [{ data: initialData, source: sourceName }];

// Initialize an array to collect outputs
const outputs: Array<{ name: string; data: any }> = [];

// Trigger the onFlowStart hook if defined
// Trigger onFlowStart, possibly updating orchestratorId
if (this.flowLifecycle?.onFlowStart) {
const maybeId = await this.flowLifecycle.onFlowStart(
const flowId = await this.flowLifecycle.onFlowStart(
userId,
sourceName,
initialData
);
if (maybeId) {
orchestratorId = maybeId;
if (flowId) {
orchestratorId = flowId;
}
}

// Process each item in the queue
// Process the queue until empty
while (queue.length > 0) {
const { data, source } = queue.shift()!;

// Notify the onFlowStep hook of new input
// Notify of an incoming step
if (this.flowLifecycle?.onFlowStep) {
await this.flowLifecycle.onFlowStep(
orchestratorId,
Expand All @@ -292,28 +293,31 @@ export class Orchestrator {
);
}

// Main processing of the data
// Main content processing
const processedResults = await this.processContent(
data as ProcessableContent | ProcessableContent[],
source,
userId
);

if (!processedResults || processedResults.length === 0) {
if (!processedResults?.length) {
continue;
}

for (const processed of processedResults) {
if (processed.alreadyProcessed) continue;
// Handle each processed result
for (const result of processedResults) {
if (result.alreadyProcessed) {
continue;
}

// Schedule tasks if any are present
// Schedule tasks if present
if (
processed.updateTasks?.length &&
result.updateTasks?.length &&
this.flowLifecycle?.onTasksScheduled
) {
await this.flowLifecycle.onTasksScheduled(
userId,
processed.updateTasks.map((task) => ({
result.updateTasks.map((task) => ({
name: task.name,
data: task.data,
intervalMs: task.intervalMs,
Expand All @@ -322,8 +326,9 @@ export class Orchestrator {
}

// Dispatch suggested outputs or actions
for (const output of processed.suggestedOutputs ?? []) {
for (const output of result.suggestedOutputs ?? []) {
const handler = this.ioHandlers.get(output.name);

if (!handler) {
this.logger.warn(
"Orchestrator.runAutonomousFlow",
Expand All @@ -332,80 +337,92 @@ export class Orchestrator {
continue;
}

if (handler.role === HandlerRole.OUTPUT) {
outputs.push({ name: output.name, data: output.data });
await this.dispatchToOutput(
output.name,
request,
output.data
);

if (this.flowLifecycle?.onFlowStep) {
await this.flowLifecycle.onFlowStep(
orchestratorId,
userId,
HandlerRole.OUTPUT,
// Depending on the handler role, dispatch appropriately
switch (handler.role) {
case HandlerRole.OUTPUT:
outputs.push({
name: output.name,
data: output.data,
});
await this.dispatchToOutput(
output.name,
request,
output.data
);
}

if (this.flowLifecycle?.onOutputDispatched) {
await this.flowLifecycle.onOutputDispatched(
orchestratorId,
userId,
if (this.flowLifecycle?.onFlowStep) {
await this.flowLifecycle.onFlowStep(
orchestratorId,
userId,
HandlerRole.OUTPUT,
output.name,
output.data
);
}

if (this.flowLifecycle?.onOutputDispatched) {
await this.flowLifecycle.onOutputDispatched(
orchestratorId,
userId,
output.name,
output.data
);
}
break;

case HandlerRole.ACTION:
const actionResult = await this.dispatchToAction(
output.name,
request,
output.data
);
}
} else if (handler.role === HandlerRole.ACTION) {
const actionResult = await this.dispatchToAction(
output.name,
request,
output.data
);

if (
this.flowLifecycle?.onActionDispatched &&
this.flowLifecycle?.onFlowStep
) {
await this.flowLifecycle.onFlowStep(
orchestratorId,
userId,
HandlerRole.ACTION,
output.name,
{ input: output.data, result: actionResult }
);
await this.flowLifecycle.onActionDispatched(
orchestratorId,
userId,
output.name,
output.data,
actionResult
);
}

// Queue new data from action results
if (actionResult) {
const newItems = Array.isArray(actionResult)
? actionResult
: [actionResult];
for (const item of newItems) {
queue.push({ data: item, source: output.name });
if (
this.flowLifecycle?.onFlowStep &&
this.flowLifecycle?.onActionDispatched
) {
await this.flowLifecycle.onFlowStep(
orchestratorId,
userId,
HandlerRole.ACTION,
output.name,
{ input: output.data, result: actionResult }
);
await this.flowLifecycle.onActionDispatched(
orchestratorId,
userId,
output.name,
output.data,
actionResult
);
}
}
} else {
this.logger.warn(
"Orchestrator.runAutonomousFlow",
"Suggested output has an unrecognized role",
handler.role
);

// Queue any new data returned from the action
if (actionResult) {
const newItems = Array.isArray(actionResult)
? actionResult
: [actionResult];
for (const item of newItems) {
queue.push({
data: item,
source: output.name,
});
}
}
break;

default:
this.logger.warn(
"Orchestrator.runAutonomousFlow",
"Suggested output has an unrecognized role",
handler.role
);
}
}
}
}

// Return the final outputs
// Return all collected outputs
return outputs;
}

Expand Down

0 comments on commit 8b6cfb0

Please sign in to comment.