Skip to content

Commit

Permalink
integrate event driven architecture
Browse files Browse the repository at this point in the history
  • Loading branch information
spikelu2016 committed Feb 6, 2024
1 parent f84046e commit 7001d14
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 521 deletions.
26 changes: 25 additions & 1 deletion cmd/bricksllm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/bricks-cloud/bricksllm/internal/config"
"github.com/bricks-cloud/bricksllm/internal/logger/zap"
"github.com/bricks-cloud/bricksllm/internal/manager"
"github.com/bricks-cloud/bricksllm/internal/message"
"github.com/bricks-cloud/bricksllm/internal/provider/anthropic"
"github.com/bricks-cloud/bricksllm/internal/provider/azure"
"github.com/bricks-cloud/bricksllm/internal/provider/custom"
Expand Down Expand Up @@ -171,10 +172,23 @@ func main() {
log.Sugar().Fatalf("error connecting to api redis cache: %v", err)
}

accessRedisCache := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%s", cfg.RedisHosts, cfg.RedisPort),
Password: cfg.RedisPassword,
DB: 4,
})

ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := apiRedisCache.Ping(ctx).Err(); err != nil {
log.Sugar().Fatalf("error connecting to api redis cache: %v", err)
}

rateLimitCache := redisStorage.NewCache(rateLimitRedisCache, cfg.RedisWriteTimeout, cfg.RedisReadTimeout)
costLimitCache := redisStorage.NewCache(costLimitRedisCache, cfg.RedisWriteTimeout, cfg.RedisReadTimeout)
costStorage := redisStorage.NewStore(costRedisStorage, cfg.RedisWriteTimeout, cfg.RedisReadTimeout)
apiCache := redisStorage.NewCache(apiRedisCache, cfg.RedisWriteTimeout, cfg.RedisReadTimeout)
accessCache := redisStorage.NewAccessCache(accessRedisCache, cfg.RedisWriteTimeout, cfg.RedisReadTimeout)

m := manager.NewManager(store)
krm := manager.NewReportingManager(costStorage, store, store)
Expand Down Expand Up @@ -209,7 +223,16 @@ func main() {

c := cache.NewCache(apiCache)

ps, err := proxy.NewProxyServer(log, *modePtr, *privacyPtr, c, m, rm, a, psm, cpm, store, memStore, ce, ace, aoe, v, rec, rlm, cfg.ProxyTimeout)
messageBus := message.NewMessageBus()
eventMessageChan := make(chan message.Message)
messageBus.Subscribe("event", eventMessageChan)

handler := message.NewHandler(rec, log, ace, ce, aoe, v, m, rlm, accessCache)

eventConsumer := message.NewConsumer(eventMessageChan, log, 4, handler.HandleEventWithRequestAndResponse)
eventConsumer.StartEventMessageConsumers()

ps, err := proxy.NewProxyServer(log, *modePtr, *privacyPtr, c, m, rm, a, psm, cpm, store, memStore, ce, ace, aoe, v, rec, messageBus, rlm, cfg.ProxyTimeout, accessCache)
if err != nil {
log.Sugar().Fatalf("error creating proxy http server: %v", err)
}
Expand All @@ -220,6 +243,7 @@ func main() {
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

eventConsumer.Stop()
memStore.Stop()
psMemStore.Stop()
cpMemStore.Stop()
Expand Down
269 changes: 0 additions & 269 deletions cmd/tool/main.go

This file was deleted.

Loading

0 comments on commit 7001d14

Please sign in to comment.