Skip to content

Commit

Permalink
feat: rate limit err log record
Browse files Browse the repository at this point in the history
  • Loading branch information
zijiren233 committed Jan 5, 2025
1 parent c6fd01a commit c62b4ad
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 90 deletions.
13 changes: 11 additions & 2 deletions service/aiproxy/common/consume/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func Wait() {
}

func AsyncConsume(
ctx context.Context,
postGroupConsumer balance.PostGroupConsumer,
code int,
usage *relaymodel.Usage,
Expand All @@ -41,7 +40,17 @@ func AsyncConsume(
}
}()

go Consume(ctx, postGroupConsumer, code, usage, meta, inputPrice, outputPrice, content, requestDetail)
go Consume(
context.Background(),
postGroupConsumer,
code,
usage,
meta,
inputPrice,
outputPrice,
content,
requestDetail,
)
}

func Consume(
Expand Down
122 changes: 81 additions & 41 deletions service/aiproxy/common/rpmlimit/rate-limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,60 +16,89 @@ const (
groupModelRPMKey = "group_model_rpm:%s:%s"
)

// 1. 使用Redis列表存储请求时间戳
// 2. 列表长度代表当前窗口内的请求数
// 3. 如果请求数未达到限制,直接添加新请求并返回成功
// 4. 如果达到限制,则检查最老的请求是否已经过期
// 5. 如果最老的请求已过期,最多移除3个过期请求并添加新请求,否则拒绝新请求
// 6. 通过EXPIRE命令设置键的过期时间,自动清理过期数据
var luaScript = `
var pushRequestScript = `
local key = KEYS[1]
local max_requests = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
local window = tonumber(ARGV[1])
local current_time = tonumber(ARGV[2])
local cutoff = current_time - window
local page_size = 100
local remove_count = 0
local count = redis.call('LLEN', key)
if count < max_requests then
redis.call('LPUSH', key, current_time)
redis.call('PEXPIRE', key, window)
return 1
else
local removed = 0
for i = 1, 3 do
local oldest = redis.call('LINDEX', key, -1)
if current_time - tonumber(oldest) >= window then
redis.call('RPOP', key)
removed = removed + 1
while true do
local timestamps = redis.call('LRANGE', key, remove_count, remove_count + page_size - 1)
if #timestamps == 0 then
break
end
local found_non_expired = false
for i = 1, #timestamps do
local timestamp = tonumber(timestamps[i])
if timestamp < cutoff then
remove_count = remove_count + 1
else
found_non_expired = true
break
end
end
if removed > 0 then
redis.call('LPUSH', key, current_time)
redis.call('PEXPIRE', key, window)
return 1
else
return 0
if found_non_expired then
break
end
end
if remove_count > 0 then
redis.call('LTRIM', key, remove_count, -1)
end
redis.call('LPUSH', key, current_time)
redis.call('PEXPIRE', key, window)
return redis.call('LLEN', key)
`

var getRPMSumLuaScript = `
var getRequestCountScript = `
local pattern = ARGV[1]
local window = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
local cutoff = current_time - window
local page_size = 100
local keys = redis.call('KEYS', pattern)
local total = 0
for _, key in ipairs(keys) do
local timestamps = redis.call('LRANGE', key, 0, -1)
for _, ts in ipairs(timestamps) do
if current_time - tonumber(ts) < window then
total = total + 1
local remove_count = 0
while true do
local timestamps = redis.call('LRANGE', key, remove_count, remove_count + page_size - 1)
if #timestamps == 0 then
break
end
local found_non_expired = false
for i = 1, #timestamps do
local timestamp = tonumber(timestamps[i])
if timestamp < cutoff then
remove_count = remove_count + 1
else
found_non_expired = true
break
end
end
if found_non_expired then
break
end
end
if remove_count > 0 then
redis.call('LTRIM', key, remove_count, -1)
end
local total_count = redis.call('LLEN', key)
total = total + total_count
end
return total
Expand All @@ -93,24 +122,35 @@ func GetRPM(ctx context.Context, group, model string) (int64, error) {

rdb := common.RDB
currentTime := time.Now().UnixMilli()
result, err := rdb.Eval(ctx, getRPMSumLuaScript, []string{}, pattern, time.Minute.Milliseconds(), currentTime).Int64()
result, err := rdb.Eval(
ctx,
getRequestCountScript,
[]string{},
pattern,
time.Minute.Milliseconds(),
currentTime,
).Int64()
if err != nil {
return 0, err
}

return result, nil
}

func redisRateLimitRequest(ctx context.Context, group, model string, maxRequestNum int64, duration time.Duration) (bool, error) {
rdb := common.RDB
currentTime := time.Now().UnixMilli()
result, err := rdb.Eval(ctx, luaScript, []string{
fmt.Sprintf(groupModelRPMKey, group, model),
}, maxRequestNum, duration.Milliseconds(), currentTime).Int64()
result, err := rdb.Eval(
ctx,
pushRequestScript,
[]string{
fmt.Sprintf(groupModelRPMKey, group, model),
},
duration.Milliseconds(),
time.Now().UnixMilli(),
).Int64()
if err != nil {
return false, err
}
return result == 1, nil
return result <= maxRequestNum, nil
}

func RateLimit(ctx context.Context, group, model string, maxRequestNum int64, duration time.Duration) (bool, error) {
Expand Down
59 changes: 50 additions & 9 deletions service/aiproxy/middleware/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@ import (
"fmt"
"net/http"
"slices"
"strings"
"time"

"github.com/gin-gonic/gin"
"github.com/labring/sealos/service/aiproxy/common"
"github.com/labring/sealos/service/aiproxy/common/config"
"github.com/labring/sealos/service/aiproxy/common/consume"
"github.com/labring/sealos/service/aiproxy/common/ctxkey"
"github.com/labring/sealos/service/aiproxy/common/rpmlimit"
"github.com/labring/sealos/service/aiproxy/model"
"github.com/labring/sealos/service/aiproxy/relay/meta"
log "github.com/sirupsen/logrus"
)

type ModelRequest struct {
Model string `form:"model" json:"model"`
}

func calculateGroupConsumeLevelRpmRatio(usedAmount float64) float64 {
v := config.GetGroupConsumeLevelRpmRatio()
var maxConsumeLevel float64 = -1
Expand Down Expand Up @@ -90,7 +89,13 @@ func checkGroupModelRPMAndTPM(c *gin.Context, group *model.GroupCache, requestMo
return nil
}

func Distribute(c *gin.Context) {
func NewDistribute(mode int) gin.HandlerFunc {
return func(c *gin.Context) {
distribute(c, mode)
}
}

func distribute(c *gin.Context, mode int) {
if config.GetDisableServe() {
abortWithMessage(c, http.StatusServiceUnavailable, "service is under maintenance")
return
Expand All @@ -110,6 +115,8 @@ func Distribute(c *gin.Context) {
return
}

c.Set(ctxkey.OriginalModel, requestModel)

SetLogModelFields(log.Data, requestModel)

mc, ok := GetModelCaches(c).ModelConfigMap[requestModel]
Expand All @@ -118,7 +125,10 @@ func Distribute(c *gin.Context) {
return
}

c.Set(ctxkey.ModelConfig, mc)

token := GetToken(c)

if len(token.Models) == 0 || !slices.Contains(token.Models, requestModel) {
abortWithMessage(c,
http.StatusForbidden,
Expand All @@ -130,13 +140,21 @@ func Distribute(c *gin.Context) {
}

if err := checkGroupModelRPMAndTPM(c, group, requestModel, mc.RPM, mc.TPM); err != nil {
abortWithMessage(c, http.StatusTooManyRequests, err.Error())
errMsg := err.Error()
consume.AsyncConsume(
nil,
http.StatusTooManyRequests,
nil,
NewMetaByContext(c, nil, requestModel, mode),
0,
0,
errMsg,
nil,
)
abortWithMessage(c, http.StatusTooManyRequests, errMsg)
return
}

c.Set(ctxkey.OriginalModel, requestModel)
c.Set(ctxkey.ModelConfig, mc)

c.Next()
}

Expand Down Expand Up @@ -164,3 +182,26 @@ func NewMetaByContext(c *gin.Context, channel *model.Channel, modelName string,
meta.WithEndpoint(c.Request.URL.Path),
)
}

type ModelRequest struct {
Model string `form:"model" json:"model"`
}

func getRequestModel(c *gin.Context) (string, error) {
path := c.Request.URL.Path
switch {
case strings.HasPrefix(path, "/v1/audio/transcriptions"),
strings.HasPrefix(path, "/v1/audio/translations"):
return c.Request.FormValue("model"), nil
case strings.HasPrefix(path, "/v1/engines") && strings.HasSuffix(path, "/embeddings"):
// /engines/:model/embeddings
return c.Param("model"), nil
default:
var modelRequest ModelRequest
err := common.UnmarshalBodyReusable(c.Request, &modelRequest)
if err != nil {
return "", fmt.Errorf("get request model failed: %w", err)
}
return modelRequest.Model, nil
}
}
20 changes: 0 additions & 20 deletions service/aiproxy/middleware/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package middleware

import (
"fmt"
"strings"

"github.com/gin-gonic/gin"
"github.com/labring/sealos/service/aiproxy/common"
"github.com/labring/sealos/service/aiproxy/relay/model"
)

Expand All @@ -27,21 +25,3 @@ func abortWithMessage(c *gin.Context, statusCode int, message string) {
})
c.Abort()
}

func getRequestModel(c *gin.Context) (string, error) {
path := c.Request.URL.Path
switch {
case strings.HasPrefix(path, "/v1/audio/transcriptions"), strings.HasPrefix(path, "/v1/audio/translations"):
return c.Request.FormValue("model"), nil
case strings.HasPrefix(path, "/v1/engines") && strings.HasSuffix(path, "/embeddings"):
// /engines/:model/embeddings
return c.Param("model"), nil
default:
var modelRequest ModelRequest
err := common.UnmarshalBodyReusable(c.Request, &modelRequest)
if err != nil {
return "", fmt.Errorf("get request model failed: %w", err)
}
return modelRequest.Model, nil
}
}
5 changes: 0 additions & 5 deletions service/aiproxy/relay/controller/handle.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package controller

import (
"context"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -37,7 +36,6 @@ func Handle(meta *meta.Meta, c *gin.Context, preProcess func() (*PreCheckGroupBa
log.Errorf("get group (%s) balance failed: %v", meta.Group.ID, err)
errMsg := fmt.Sprintf("get group (%s) balance failed", meta.Group.ID)
consume.AsyncConsume(
context.Background(),
nil,
http.StatusInternalServerError,
nil,
Expand Down Expand Up @@ -68,7 +66,6 @@ func Handle(meta *meta.Meta, c *gin.Context, preProcess func() (*PreCheckGroupBa
}
}
consume.AsyncConsume(
context.Background(),
nil,
http.StatusBadRequest,
nil,
Expand Down Expand Up @@ -104,7 +101,6 @@ func Handle(meta *meta.Meta, c *gin.Context, preProcess func() (*PreCheckGroupBa
}

consume.AsyncConsume(
context.Background(),
postGroupConsumer,
respErr.StatusCode,
usage,
Expand All @@ -119,7 +115,6 @@ func Handle(meta *meta.Meta, c *gin.Context, preProcess func() (*PreCheckGroupBa

// 6. Post consume
consume.AsyncConsume(
context.Background(),
postGroupConsumer,
http.StatusOK,
usage,
Expand Down
5 changes: 4 additions & 1 deletion service/aiproxy/relay/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func NewMeta(
values: make(map[string]any),
Mode: mode,
OriginModel: modelName,
ActualModel: modelName,
RequestAt: time.Now(),
ModelConfig: modelConfig,
}
Expand All @@ -89,7 +90,9 @@ func NewMeta(
opt(&meta)
}

meta.Reset(channel)
if channel != nil {
meta.Reset(channel)
}

return &meta
}
Expand Down
Loading

0 comments on commit c62b4ad

Please sign in to comment.