Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
irees committed Nov 4, 2023
1 parent 515fca9 commit f1ae895
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 115 deletions.
24 changes: 12 additions & 12 deletions internal/meters/amberflo.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (
"github.com/xtgo/uuid"
)

type AmberFlo struct {
type Amberflo struct {
apikey string
interval time.Duration
client *metering.Metering
usageClient *metering.UsageClient
cfgs map[string]amberFloConfig
}

func NewAmberFlo(apikey string, interval time.Duration, batchSize int) *AmberFlo {
func NewAmberflo(apikey string, interval time.Duration, batchSize int) *Amberflo {
afLog := &amberfloLogger{logger: log.Logger}
meteringClient := metering.NewMeteringClient(
apikey,
Expand All @@ -32,7 +32,7 @@ func NewAmberFlo(apikey string, interval time.Duration, batchSize int) *AmberFlo
apikey,
metering.WithCustomLogger(afLog),
)
return &AmberFlo{
return &Amberflo{
apikey: apikey,
interval: interval,
client: meteringClient,
Expand All @@ -48,7 +48,7 @@ type amberFloConfig struct {
Dimensions Dimensions `json:"dimensions,omitempty"`
}

func (m *AmberFlo) LoadConfig(path string) error {
func (m *Amberflo) LoadConfig(path string) error {
cfgs := map[string]amberFloConfig{}
data, err := ioutil.ReadFile(path)
if err != nil {
Expand All @@ -61,24 +61,24 @@ func (m *AmberFlo) LoadConfig(path string) error {
return nil
}

func (m *AmberFlo) NewMeter(user MeterUser) ApiMeter {
func (m *Amberflo) NewMeter(user MeterUser) ApiMeter {
return &amberFloMeter{
user: user,
mp: m,
}
}

func (m *AmberFlo) Close() error {
func (m *Amberflo) Close() error {
return m.client.Shutdown()
}

func (m *AmberFlo) Flush() error {
func (m *Amberflo) Flush() error {
// metering.Flush() // in API docs but not in library
time.Sleep(m.interval)
return nil
}

func (m *AmberFlo) getValue(user MeterUser, meterName string, startTime time.Time, endTime time.Time, checkDims Dimensions) (float64, bool) {
func (m *Amberflo) getValue(user MeterUser, meterName string, startTime time.Time, endTime time.Time, checkDims Dimensions) (float64, bool) {
// TODO: time period and aggregation is hardcoded as 1 day
cfg, ok := m.getcfg(meterName)
if !ok {
Expand Down Expand Up @@ -136,7 +136,7 @@ func (m *AmberFlo) getValue(user MeterUser, meterName string, startTime time.Tim
return total, true
}

func (m *AmberFlo) sendMeter(user MeterUser, meterName string, value float64, extraDimensions Dimensions) error {
func (m *Amberflo) sendMeter(user MeterUser, meterName string, value float64, extraDimensions Dimensions) error {
cfg, ok := m.getcfg(meterName)
if !ok {
return nil
Expand Down Expand Up @@ -165,7 +165,7 @@ func (m *AmberFlo) sendMeter(user MeterUser, meterName string, value float64, ex
})
}

func (m *AmberFlo) getCustomerID(cfg amberFloConfig, user MeterUser) (string, bool) {
func (m *Amberflo) getCustomerID(cfg amberFloConfig, user MeterUser) (string, bool) {
customerId := cfg.DefaultUser
if user != nil {
eidKey := cfg.ExternalIDKey
Expand All @@ -182,7 +182,7 @@ func (m *AmberFlo) getCustomerID(cfg amberFloConfig, user MeterUser) (string, bo
return customerId, customerId != ""
}

func (m *AmberFlo) getcfg(meterName string) (amberFloConfig, bool) {
func (m *Amberflo) getcfg(meterName string) (amberFloConfig, bool) {
cfg, ok := m.cfgs[meterName]
if !ok {
cfg = amberFloConfig{
Expand All @@ -201,7 +201,7 @@ func (m *AmberFlo) getcfg(meterName string) (amberFloConfig, bool) {
type amberFloMeter struct {
user MeterUser
addDims []eventAddDim
mp *AmberFlo
mp *Amberflo
}

func (m *amberFloMeter) Meter(meterName string, value float64, extraDimensions Dimensions) error {
Expand Down
44 changes: 19 additions & 25 deletions internal/meters/amberflo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,16 @@ import (
"github.com/interline-io/transitland-server/internal/testutil"
)

type amberfloTestUser struct {
name string
}

func (u *amberfloTestUser) ID() string {
return u.name
}

func (u *amberfloTestUser) GetExternalData(eid string) (string, bool) {
// must match key given in config below
if eid == "amberflo" {
return u.name, true
}
return "", false
}

func TestAmberFloMeter(t *testing.T) {
mp, testConfig, err := getTestAmberFloMeter()
func TestAmberfloMeter(t *testing.T) {
mp, testConfig, err := getTestAmberfloMeter()
if err != nil {
t.Skip(err.Error())
return
}
testMeter(t, mp, testConfig)
}

func getTestAmberFloMeter() (*AmberFlo, testMeterConfig, error) {
func getTestAmberfloMeter() (*Amberflo, testMeterConfig, error) {
checkKeys := []string{
"TL_TEST_AMBERFLO_APIKEY",
"TL_TEST_AMBERFLO_METER1",
Expand All @@ -49,15 +33,25 @@ func getTestAmberFloMeter() (*AmberFlo, testMeterConfig, error) {
return nil, testMeterConfig{}, errors.New(a)
}
}
eidKey := "amberflo"
testConfig := testMeterConfig{
testMeter1: os.Getenv("TL_TEST_AMBERFLO_METER1"),
testMeter2: os.Getenv("TL_TEST_AMBERFLO_METER2"),
user1: &amberfloTestUser{name: os.Getenv("TL_TEST_AMBERFLO_USER1")},
user2: &amberfloTestUser{name: os.Getenv("TL_TEST_AMBERFLO_USER2")},
user3: &amberfloTestUser{name: os.Getenv("TL_TEST_AMBERFLO_USER3")},
user1: &testUser{
name: os.Getenv("TL_TEST_AMBERFLO_USER1"),
data: map[string]string{eidKey: os.Getenv("TL_TEST_AMBERFLO_USER1")},
},
user2: &testUser{
name: os.Getenv("TL_TEST_AMBERFLO_USER2"),
data: map[string]string{eidKey: os.Getenv("TL_TEST_AMBERFLO_USER2")},
},
user3: &testUser{
name: os.Getenv("TL_TEST_AMBERFLO_USER3"),
data: map[string]string{eidKey: os.Getenv("TL_TEST_AMBERFLO_USER3")},
},
}
mp := NewAmberFlo(os.Getenv("TL_TEST_AMBERFLO_APIKEY"), 1*time.Second, 1)
mp.cfgs[testConfig.testMeter1] = amberFloConfig{Name: testConfig.testMeter1, ExternalIDKey: "amberflo"}
mp.cfgs[testConfig.testMeter2] = amberFloConfig{Name: testConfig.testMeter2, ExternalIDKey: "amberflo"}
mp := NewAmberflo(os.Getenv("TL_TEST_AMBERFLO_APIKEY"), 1*time.Second, 1)
mp.cfgs[testConfig.testMeter1] = amberFloConfig{Name: testConfig.testMeter1, ExternalIDKey: eidKey}
mp.cfgs[testConfig.testMeter2] = amberFloConfig{Name: testConfig.testMeter2, ExternalIDKey: eidKey}
return mp, testConfig, nil
}
73 changes: 47 additions & 26 deletions internal/meters/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ import (
"time"

"github.com/rs/zerolog/log"
"github.com/tidwall/gjson"
)

func init() {
var _ MeterProvider = &LimitMeterProvider{}
}

type LimitMeterProvider struct {
Enabled bool
UserLimits map[string][]userMeterLimit
Enabled bool
DefaultLimits []userMeterLimit
MeterProvider
}

func NewLimitMeterProvider(provider MeterProvider) *LimitMeterProvider {
return &LimitMeterProvider{
MeterProvider: provider,
UserLimits: map[string][]userMeterLimit{},
}
}

Expand All @@ -42,32 +42,32 @@ type LimitMeter struct {
ApiMeter
}

func (c *LimitMeter) GetLimit(meterName string, checkDims Dimensions) (userMeterLimit, bool) {
var lim userMeterLimit
found := false
for _, checkLim := range c.provider.UserLimits[c.userId] {
func (c *LimitMeter) GetLimits(meterName string, checkDims Dimensions) []userMeterLimit {
var lims []userMeterLimit
for _, checkLim := range parseGkUserLimits(c.userData) {
if checkLim.MeterName == meterName && matchDims(checkDims, checkLim.Dims) {
found = true
lim = checkLim
break
lims = append(lims, checkLim)
}
}
if !found {
return lim, false
for _, checkLim := range c.provider.DefaultLimits {
if checkLim.MeterName == meterName && matchDims(checkDims, checkLim.Dims) {
lims = append(lims, checkLim)
}
}
return lim, true
return lims
}

func (c *LimitMeter) Meter(meterName string, value float64, extraDimensions Dimensions) error {
lim, foundLimit := c.GetLimit(meterName, extraDimensions)
d1, d2 := lim.Span()
if c.provider.Enabled && foundLimit {
currentValue, _ := c.GetValue(meterName, d1, d2, extraDimensions)
if foundLimit && currentValue+value > lim.Limit {
log.Info().Str("meter", meterName).Str("user", c.userId).Float64("current", currentValue).Float64("add", value).Str("dims", fmt.Sprintf("%v", extraDimensions)).Msg("rate limited")
return errors.New("rate limited")
} else {
log.Info().Str("meter", meterName).Str("user", c.userId).Float64("current", currentValue).Float64("add", value).Str("dims", fmt.Sprintf("%v", extraDimensions)).Msg("rate check")
if c.provider.Enabled {
for _, lim := range c.GetLimits(meterName, extraDimensions) {
d1, d2 := lim.Span()
currentValue, _ := c.GetValue(meterName, d1, d2, extraDimensions)
if currentValue+value > lim.Limit {
log.Info().Str("meter", meterName).Str("user", c.userId).Float64("current", currentValue).Float64("add", value).Str("dims", fmt.Sprintf("%v", extraDimensions)).Msg("rate limited")
return errors.New("rate limited")
} else {
log.Info().Str("meter", meterName).Str("user", c.userId).Float64("current", currentValue).Float64("add", value).Str("dims", fmt.Sprintf("%v", extraDimensions)).Msg("rate check ok")
}
}
}
return c.ApiMeter.Meter(meterName, value, extraDimensions)
Expand All @@ -85,16 +85,16 @@ func (lim *userMeterLimit) Span() (time.Time, time.Time) {
now := time.Now().In(time.UTC)
d1 := now
d2 := now
if lim.Period == "hour" {
if lim.Period == "hourly" {
d1 = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC)
d2 = d1.Add(3600 * time.Second)
} else if lim.Period == "day" {
} else if lim.Period == "daily" {
d1 = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
d2 = d1.AddDate(0, 0, 1)
} else if lim.Period == "month" {
} else if lim.Period == "monthly" {
d1 = time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
d2 = d1.AddDate(0, 1, 0)
} else if lim.Period == "year" {
} else if lim.Period == "yearly" {
d1 = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.UTC)
d2 = d1.AddDate(1, 0, 0)
} else if lim.Period == "total" {
Expand All @@ -105,3 +105,24 @@ func (lim *userMeterLimit) Span() (time.Time, time.Time) {
}
return d1, d2
}

func parseGkUserLimits(v string) []userMeterLimit {
var lims []userMeterLimit
for _, productLimit := range gjson.Get(v, "product_limits").Map() {
for _, plim := range productLimit.Array() {
lim := userMeterLimit{
MeterName: plim.Get("amberflo_meter").String(),
Limit: plim.Get("limit_value").Float(),
Period: plim.Get("time_period").String(),
}
if dim := plim.Get("amberflo_dimension").String(); dim != "" {
lim.Dims = append(lim.Dims, Dimension{
Key: dim,
Value: plim.Get("amberflo_dimension_value").String(),
})
}
lims = append(lims, lim)
}
}
return lims
}
Loading

0 comments on commit f1ae895

Please sign in to comment.