Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
irees committed Nov 3, 2023
1 parent 9f4b6a2 commit 515fca9
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 171 deletions.
31 changes: 24 additions & 7 deletions internal/meters/amberflo.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,34 @@ func (m *AmberFlo) getValue(user MeterUser, meterName string, startTime time.Tim
if cfg.Name == "" {
return 0, false
}

startTimeInSeconds := (time.Now().In(time.UTC).UnixNano() / int64(time.Second)) - (24 * 60 * 60)
timeRange := &metering.TimeRange{
StartTimeInSeconds: startTimeInSeconds,
StartTimeInSeconds: startTime.In(time.UTC).Unix(),
EndTimeInSeconds: endTime.In(time.UTC).Unix(),
}
if timeRange.EndTimeInSeconds > time.Now().In(time.UTC).Unix() {
timeRange.EndTimeInSeconds = 0
}

filter := make(map[string][]string)
filter["customerId"] = []string{customerId}
for _, dim := range checkDims {
filter[dim.Key] = []string{dim.Value}
}

timeGroupingInterval := metering.Hour
switch timeSpan := endTime.Unix() - startTime.Unix(); {
case timeSpan > 24*60*60:
timeGroupingInterval = metering.Month
case timeSpan > 60*60:
timeGroupingInterval = metering.Day
default:
timeGroupingInterval = metering.Hour
}

usageResult, err := m.usageClient.GetUsage(&metering.UsagePayload{
MeterApiName: cfg.Name,
Aggregation: metering.Sum,
TimeGroupingInterval: metering.Day,
TimeGroupingInterval: timeGroupingInterval,
GroupBy: []string{"customerId"},
TimeRange: timeRange,
Filter: filter,
Expand All @@ -114,9 +131,9 @@ func (m *AmberFlo) getValue(user MeterUser, meterName string, startTime time.Tim
log.Error().Err(err).Str("user", user.ID()).Msg("could not get value; no client value meter")
return 0, false
}
cm := usageResult.ClientMeters[0].Values
cmv := cm[len(cm)-1].Value
return cmv, true

total := usageResult.ClientMeters[0].GroupValue
return total, true
}

func (m *AmberFlo) sendMeter(user MeterUser, meterName string, value float64, extraDimensions Dimensions) error {
Expand Down
62 changes: 0 additions & 62 deletions internal/meters/cache.go

This file was deleted.

15 changes: 0 additions & 15 deletions internal/meters/cache_test.go

This file was deleted.

68 changes: 39 additions & 29 deletions internal/meters/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,18 @@ package meters

import (
"errors"
"fmt"
"time"

"github.com/rs/zerolog/log"
)

func init() {
var _ MeterProvider = &LimitMeterProvider{}
}

type userMeterLimit struct {
User string
MeterName string
Dims Dimensions
Period string
Limit float64
}

type LimitMeterProvider struct {
Enabled bool
UserLimits map[string][]userMeterLimit
MeterProvider
}
Expand Down Expand Up @@ -46,7 +42,7 @@ type LimitMeter struct {
ApiMeter
}

func (c *LimitMeter) GetLimit(meterName string, checkDims Dimensions) (time.Time, time.Time, float64, bool) {
func (c *LimitMeter) GetLimit(meterName string, checkDims Dimensions) (userMeterLimit, bool) {
var lim userMeterLimit
found := false
for _, checkLim := range c.provider.UserLimits[c.userId] {
Expand All @@ -57,13 +53,42 @@ func (c *LimitMeter) GetLimit(meterName string, checkDims Dimensions) (time.Time
}
}
if !found {
// fmt.Println("no limit found")
return time.Now(), time.Now(), 0, false
return lim, false
}
return lim, true
}

func (c *LimitMeter) Meter(meterName string, value float64, extraDimensions Dimensions) error {
lim, foundLimit := c.GetLimit(meterName, extraDimensions)
d1, d2 := lim.Span()
if c.provider.Enabled && foundLimit {
currentValue, _ := c.GetValue(meterName, d1, d2, extraDimensions)
if foundLimit && currentValue+value > lim.Limit {
log.Info().Str("meter", meterName).Str("user", c.userId).Float64("current", currentValue).Float64("add", value).Str("dims", fmt.Sprintf("%v", extraDimensions)).Msg("rate limited")
return errors.New("rate limited")
} else {
log.Info().Str("meter", meterName).Str("user", c.userId).Float64("current", currentValue).Float64("add", value).Str("dims", fmt.Sprintf("%v", extraDimensions)).Msg("rate check")
}
}
return c.ApiMeter.Meter(meterName, value, extraDimensions)
}

type userMeterLimit struct {
User string
MeterName string
Dims Dimensions
Period string
Limit float64
}

func (lim *userMeterLimit) Span() (time.Time, time.Time) {
now := time.Now().In(time.UTC)
d1 := now
d2 := now
if lim.Period == "day" {
if lim.Period == "hour" {
d1 = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC)
d2 = d1.Add(3600 * time.Second)
} else if lim.Period == "day" {
d1 = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
d2 = d1.AddDate(0, 0, 1)
} else if lim.Period == "month" {
Expand All @@ -76,22 +101,7 @@ func (c *LimitMeter) GetLimit(meterName string, checkDims Dimensions) (time.Time
d1 = time.Unix(0, 0)
d2 = time.Unix(1<<63-1, 0)
} else {
return now, now, 0, false
return now, now
}
// fmt.Println("limit found:", d1, d2, lim.Limit, lim.Period)
return d1, d2, lim.Limit, true
}

func (c *LimitMeter) Meter(meterName string, value float64, extraDimensions Dimensions) error {
d1, d2, lim, foundLimit := c.GetLimit(meterName, extraDimensions)
a, valueFound := c.GetValue(meterName, d1, d2, extraDimensions)
_ = valueFound
// if !valueFound {
// fmt.Println("value not found")
// }
// fmt.Println("a:", a, "value:", value, "lim:", lim, "extraDims:", extraDimensions)
if foundLimit && a+value > lim {
return errors.New("rate limited")
}
return c.ApiMeter.Meter(meterName, value, extraDimensions)
return d1, d2
}
105 changes: 62 additions & 43 deletions internal/meters/limit_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package meters

import (
"fmt"
"math"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -13,6 +13,7 @@ func TestLimitMeter(t *testing.T) {
user := testUser{name: "testuser"}
mp := NewDefaultMeterProvider()
cmp := NewLimitMeterProvider(mp)
cmp.Enabled = true
testLimitMeter(t, cmp, meterName, user)
}

Expand All @@ -23,61 +24,79 @@ func TestLimitMeter_Amberflo(t *testing.T) {
return
}
cmp := NewLimitMeterProvider(mp)
cmp.Enabled = true
testLimitMeter(t, cmp, testConfig.testMeter1, testUser{name: testConfig.user1.ID()})
}

func testLimitMeter(t *testing.T, cmp *LimitMeterProvider, meterName string, user testUser) {
m := cmp.NewMeter(user)
testDims1 := Dimensions{{Key: "ok", Value: "test"}}
testDims2 := Dimensions{{Key: "ok", Value: "bar"}}

lim1 := 10.0
lim2 := 11.0
incr := 3.0

cmp.UserLimits[user.name] = append(cmp.UserLimits[user.name],
userMeterLimit{
testKey := 1 // time.Now().In(time.UTC).Unix()
lims := []userMeterLimit{
// foo tests
{
MeterName: meterName,
Period: "hour",
Limit: 5.0,
Dims: Dimensions{{Key: "ok", Value: fmt.Sprintf("foo:%d", testKey)}},
},
{
MeterName: meterName,
Period: "day",
Limit: 8.0,
Dims: Dimensions{{Key: "ok", Value: fmt.Sprintf("foo:%d", testKey)}},
},
{
MeterName: meterName,
Period: "month",
Limit: lim1,
Dims: testDims1,
Limit: 11.0,
Dims: Dimensions{{Key: "ok", Value: fmt.Sprintf("foo:%d", testKey)}},
},
// bar tests
{
MeterName: meterName,
Period: "hour",
Limit: 14.0,
Dims: Dimensions{{Key: "ok", Value: fmt.Sprintf("bar:%d", testKey)}},
},
userMeterLimit{
{
MeterName: meterName,
Period: "day",
Limit: lim2,
Dims: testDims2,
Limit: 17.0,
Dims: Dimensions{{Key: "ok", Value: fmt.Sprintf("bar:%d", testKey)}},
},
{
MeterName: meterName,
Period: "month",
Limit: 20.0,
Dims: Dimensions{{Key: "ok", Value: fmt.Sprintf("bar:%d", testKey)}},
},
)

// 1
successCount1 := 0.0
for i := 0; i < 10; i++ {
err := m.Meter(meterName, incr, testDims1)
if err == nil {
successCount1 += 1
}
cmp.Flush()
}
assert.Equal(t, successCount1, math.Floor(lim1/incr))

// 2
successCount2 := 0.0
for i := 0; i < 10; i++ {
err := m.Meter(meterName, incr, testDims2)
if err == nil {
successCount2 += 1
}
cmp.Flush()
}
assert.Equal(t, successCount2, math.Floor(lim2/incr))

// total 1
v1, _ := m.GetValue(meterName, time.Unix(0, 0), time.Now(), testDims1)
assert.Equal(t, successCount1*incr, v1)
incr := 3.0
for _, lim := range lims {
t.Run(fmt.Sprintf("%v", lim), func(t *testing.T) {
startTime, endTime := lim.Span()
base, _ := m.GetValue(meterName, startTime, endTime, lim.Dims)
lim.Limit += base
cmp.UserLimits[user.name] = []userMeterLimit{lim}

// total 2
v2, _ := m.GetValue(meterName, time.Unix(0, 0), time.Now(), testDims2)
assert.Equal(t, successCount2*incr, v2)
successCount := 0.0
for i := 0; i < 10; i++ {
err := m.Meter(meterName, incr, lim.Dims)
if err == nil {
successCount += 1
}
cmp.Flush()
}
expectCount := math.Floor((lim.Limit - base) / incr)
// fmt.Println("successCount:", successCount, "expectCount:", expectCount)
assert.Equal(t, expectCount, successCount)
total, _ := m.GetValue(meterName, startTime, endTime, lim.Dims)
total = total - base
expectTotal := successCount * incr
// fmt.Println("total:", total, "expectTotal:", expectTotal)
assert.Equal(t, expectTotal, total)
})
}

}
Loading

0 comments on commit 515fca9

Please sign in to comment.