Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process: History and Config-func #6

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions agency.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ type OperationConfig struct {
Messages []Message
}

func (p *Operation) Config() *OperationConfig {
return p.config
}

// NewOperation allows to create an operation from a function.
func NewOperation(handler OperationHandler) *Operation {
return &Operation{
Expand Down
2 changes: 1 addition & 1 deletion examples/custom_operation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func main() {
increment := agency.NewOperation(incrementFunc)

msg, err := agency.NewProcess(
msg, err := agency.ProcessFromOperations(
increment, increment, increment,
).Execute(context.Background(), agency.UserMessage("0"))

Expand Down
4 changes: 2 additions & 2 deletions examples/logging/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func main() {
factory := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")})
params := openai.TextToTextParams{Model: "gpt-3.5-turbo"}

_, err := agency.NewProcess(
_, err := agency.ProcessFromOperations(
factory.TextToText(params).SetPrompt("explain what that means"),
factory.TextToText(params).SetPrompt("translate to russian"),
factory.TextToText(params).SetPrompt("replace all spaces with '_'"),
Expand All @@ -31,6 +31,6 @@ func main() {
}
}

func Logger(input, output agency.Message, cfg *agency.OperationConfig) {
func Logger(input, output agency.Message, cfg *agency.OperationConfig, _ uint) {
Copy link
Contributor Author

@emil14 emil14 Dec 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Pass history here?
  2. Combine all arguments into single structure?

P.S. - last parameter is step-index, interface of the interceptor was changed and interceptor was renamed to observer

fmt.Printf("in: %v\nprompt: %v\nout: %v\n\n", input, cfg.Prompt, output)
}
59 changes: 59 additions & 0 deletions examples/process_history/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"context"
"fmt"

_ "github.com/joho/godotenv/autoload"

"github.com/neurocult/agency"
"github.com/neurocult/agency/providers/openai"
)

func main() {
provider := openai.New(openai.Params{Key: "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"})
params := openai.TextToTextParams{
Model: "gpt-3.5-turbo",
Temperature: openai.Temperature(0),
MaxTokens: 100,
}

result, err := agency.NewProcess(
agency.ProcessStep{
Operation: provider.
TextToText(params).
SetPrompt("Increase the number by adding 1 to it. Answer only in numbers, without text"),
},
agency.ProcessStep{
Operation: provider.
TextToText(params).
SetPrompt("Double the number. Answer only in numbers, without text"),
},
agency.ProcessStep{
ConfigFunc: func(history agency.ProcessHistory, cfg *agency.OperationConfig) error {
firstStepResult, _ := history.Get(0)
cfg.Prompt = fmt.Sprintf("Add %s", firstStepResult)
return nil
},
Operation: provider.TextToText(params),
},
).Execute(
context.Background(),
agency.UserMessage("5"),
func(in, out agency.Message, cfg *agency.OperationConfig, stepIndex uint) {
fmt.Printf("---\n\nSTEP %d executed\n\nINPUT: %v\n\nCONFIG: %v\n\nOUTPUT: %v\n\n", stepIndex, in, cfg, out)
},
)

if err != nil {
panic(err)
}

fmt.Println(result)
}

// InjectHistory allows to pass history between operations by injecting it into the config.
func InjectHistory(history agency.ProcessHistory, cfg *agency.OperationConfig) error {
cfg.Messages = history.All()
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,31 @@ import (

type Saver []agency.Message

func (s *Saver) Save(input, output agency.Message, _ *agency.OperationConfig) {
// This is how we can retrieve process history by hand with the interceptor, without using the history itself.
// But we can't (or it's hard to do) pass history between steps this way. For that we can use config func.
func (s *Saver) Save(input, output agency.Message, _ *agency.OperationConfig, _ uint) {
*s = append(*s, output)
}

func main() {
factory := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")})
provider := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")})

// step 1
hear := factory.
hear := provider.
SpeechToText(openai.SpeechToTextParams{
Model: goopenai.Whisper1,
})

// step2
translate := factory.
translate := provider.
TextToText(openai.TextToTextParams{
Model: "gpt-3.5-turbo",
Temperature: openai.Temperature(0.5),
}).
SetPrompt("translate to russian")

// step 3
uppercase := factory.
uppercase := provider.
TextToText(openai.TextToTextParams{
Model: "gpt-3.5-turbo",
Temperature: openai.Temperature(1),
Expand All @@ -54,7 +56,7 @@ func main() {
ctx := context.Background()
speechMsg := agency.Message{Content: sound}

_, err = agency.NewProcess(
_, err = agency.ProcessFromOperations(
hear,
translate,
uppercase,
Expand Down
2 changes: 1 addition & 1 deletion examples/rag_vector_database/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {
Model: "tts-1", ResponseFormat: "mp3", Speed: 1, Voice: "onyx",
})

result, err := agency.NewProcess(
result, err := agency.ProcessFromOperations(
retrieve,
summarize,
voice,
Expand Down
4 changes: 2 additions & 2 deletions examples/speech_to_text/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
)

func main() {
factory := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")})
provider := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")})

data, err := os.ReadFile("speech.mp3")
if err != nil {
panic(err)
}

result, err := factory.SpeechToText(openai.SpeechToTextParams{
result, err := provider.SpeechToText(openai.SpeechToTextParams{
Model: goopenai.Whisper1,
}).Execute(
context.Background(),
Expand Down
2 changes: 1 addition & 1 deletion examples/speech_to_text_to_image/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func main() {
panic(err)
}

msg, err := agency.NewProcess(
msg, err := agency.ProcessFromOperations(
factory.SpeechToText(openai.SpeechToTextParams{Model: goopenai.Whisper1}),
factory.TextToImage(openai.TextToImageParams{
Model: goopenai.CreateImageModelDallE2,
Expand Down
4 changes: 2 additions & 2 deletions examples/translate_text/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
)

func main() {
factory := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")})
provider := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")})

result, err := factory.
result, err := provider.
TextToText(openai.TextToTextParams{Model: goopenai.GPT3Dot5Turbo}).
SetPrompt("You are a helpful assistant that translates English to French").
Execute(context.Background(), agency.UserMessage("I love programming."))
Expand Down
81 changes: 67 additions & 14 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,88 @@ package agency

import (
"context"
"errors"
"fmt"
)

// Process is a chain of operations that can be executed in sequence.
// Process is a sequential chain of steps operations that can be executed in sequence.
type Process struct {
operations []*Operation
steps []ProcessStep
}

func NewProcess(operations ...*Operation) *Process {
return &Process{
operations: operations,
// ProcessStep is an object that can be chained with other steps forming the process.
type ProcessStep struct {
// Operation that current step depends on.
// It's execution is deferred until the process reaches the corresponding step.
Operation *Operation
// ConfigFunc allows to modify config based a on results from the previous steps.
// It's execution is deferred until the process reaches the corresponding step.
ConfigFunc func(ProcessHistory, *OperationConfig) error
}

// NewProcess creates new process based on a given steps. If you don't need history use ProcessFromOperations instead.
func NewProcess(steps ...ProcessStep) *Process {
return &Process{steps: steps}
}

// ProcessFromOperations allows to create process from operations.
// It's handy when all you need is to chain some operations together and you don't want to have an access to history.
func ProcessFromOperations(operations ...*Operation) *Process {
steps := make([]ProcessStep, 0, len(operations))
for _, operation := range operations {
steps = append(steps, ProcessStep{Operation: operation, ConfigFunc: nil})
}
return &Process{steps: steps}
}

// ProcessInterceptor is a function that is called by Process after one step finished but before next one is started.
type ProcessInterceptor func(in Message, out Message, cfg *OperationConfig, stepIndex uint)

// ProcessHistory stores results of the previous steps of the process. It's a process's execution context.
type ProcessHistory interface {
Get(stepIndex uint) (Message, error) // Get takes index (starts from zero) of the step which result we want to get
All() []Message // All allows to retrieve all the history of the previously processed steps
}

// processHistory implements ProcessHistory interfaces via simple slice of messages
type processHistory []Message

// Get is a panic-free way to get a message by index of the step. Indexes starts with zero. Index must be < steps count
func (p processHistory) Get(stepIndex uint) (Message, error) {
i := int(stepIndex)
if i >= len(p) {
return Message{}, errors.New("step index must less than the number of steps")
}
return p[i], nil
}

// All simply returns p as it is.
func (p processHistory) All() []Message {
return p
}

// Interceptor is a function that is called by Process after one operation finished but before next one is started.
type Interceptor func(in Message, out Message, cfg *OperationConfig)
// Execute loops over process steps and sequentially executes them by passing output of one step as an input to another.
// If interceptors are provided, they are called on each step. So for N steps and M interceptors there's N x M executions.
func (p *Process) Execute(ctx context.Context, input Message, interceptors ...ProcessInterceptor) (Message, error) {
history := make(processHistory, 0, len(p.steps))

for i, step := range p.steps {
if step.ConfigFunc != nil {
if err := step.ConfigFunc(history, step.Operation.config); err != nil {
return Message{}, fmt.Errorf("config func on step %d: %w", i, err)
}
}

// Execute iterates over Process's operations and sequentially executes them.
// After first operation is executed it uses its output as an input to the second one and so on until the whole chain is finished.
// It also executes all given interceptors, if they are provided, so for every N operations and M interceptors it's N x M executions.
func (p *Process) Execute(ctx context.Context, input Message, interceptors ...Interceptor) (Message, error) {
for _, operation := range p.operations {
output, err := operation.Execute(ctx, input)
output, err := step.Operation.Execute(ctx, input)
if err != nil {
return Message{}, err
}

history = append(history, output)

// FIXME while these are called AFTER operation and not before it's impossible to modify configuration
for _, interceptor := range interceptors {
interceptor(input, output, operation.Config())
interceptor(input, output, step.Operation.config, uint(i))
}

input = output
Expand Down