Skip to content

Commit

Permalink
Rate limits (#188)
Browse files Browse the repository at this point in the history
* Rate limiting infrastructure
  • Loading branch information
irees authored Nov 7, 2023
1 parent fe22d56 commit 223339b
Show file tree
Hide file tree
Showing 11 changed files with 555 additions and 144 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
go.work
/*.zip
tmp
tmp
run.sh
113 changes: 62 additions & 51 deletions internal/meters/amberflo.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (
"github.com/xtgo/uuid"
)

type AmberFlo struct {
type Amberflo struct {
apikey string
interval time.Duration
client *metering.Metering
usageClient *metering.UsageClient
cfgs map[string]amberFloConfig
}

func NewAmberFlo(apikey string, interval time.Duration, batchSize int) *AmberFlo {
func NewAmberflo(apikey string, interval time.Duration, batchSize int) *Amberflo {
afLog := &amberfloLogger{logger: log.Logger}
meteringClient := metering.NewMeteringClient(
apikey,
Expand All @@ -32,7 +32,7 @@ func NewAmberFlo(apikey string, interval time.Duration, batchSize int) *AmberFlo
apikey,
metering.WithCustomLogger(afLog),
)
return &AmberFlo{
return &Amberflo{
apikey: apikey,
interval: interval,
client: meteringClient,
Expand All @@ -42,13 +42,13 @@ func NewAmberFlo(apikey string, interval time.Duration, batchSize int) *AmberFlo
}

type amberFloConfig struct {
Name string `json:"name,omitempty"`
DefaultUser string `json:"default_user,omitempty"`
ExternalIDKey string `json:"external_id_key,omitempty"`
Dimensions map[string]string `json:"dimensions,omitempty"`
Name string `json:"name,omitempty"`
DefaultUser string `json:"default_user,omitempty"`
ExternalIDKey string `json:"external_id_key,omitempty"`
Dimensions Dimensions `json:"dimensions,omitempty"`
}

func (m *AmberFlo) LoadConfig(path string) error {
func (m *Amberflo) LoadConfig(path string) error {
cfgs := map[string]amberFloConfig{}
data, err := ioutil.ReadFile(path)
if err != nil {
Expand All @@ -61,26 +61,24 @@ func (m *AmberFlo) LoadConfig(path string) error {
return nil
}

func (m *AmberFlo) NewMeter(user MeterUser) ApiMeter {
func (m *Amberflo) NewMeter(user MeterUser) ApiMeter {
return &amberFloMeter{
user: user,
mp: m,
}
}

func (m *AmberFlo) Close() error {
func (m *Amberflo) Close() error {
return m.client.Shutdown()
}

func (m *AmberFlo) Flush() error {
func (m *Amberflo) Flush() error {
// metering.Flush() // in API docs but not in library
time.Sleep(m.interval)
return nil
}

func (m *AmberFlo) getValue(user MeterUser, meterName string) (float64, bool) {
// TODO: batch and cache
// TODO: time period and aggregation is hardcoded as 1 day
func (m *Amberflo) getValue(user MeterUser, meterName string, startTime time.Time, endTime time.Time, checkDims Dimensions) (float64, bool) {
cfg, ok := m.getcfg(meterName)
if !ok {
return 0, false
Expand All @@ -92,17 +90,34 @@ func (m *AmberFlo) getValue(user MeterUser, meterName string) (float64, bool) {
if cfg.Name == "" {
return 0, false
}

startTimeInSeconds := (time.Now().In(time.UTC).UnixNano() / int64(time.Second)) - (24 * 60 * 60)
timeRange := &metering.TimeRange{
StartTimeInSeconds: startTimeInSeconds,
StartTimeInSeconds: startTime.In(time.UTC).Unix(),
EndTimeInSeconds: endTime.In(time.UTC).Unix(),
}
if timeRange.EndTimeInSeconds > time.Now().In(time.UTC).Unix() {
timeRange.EndTimeInSeconds = 0
}

filter := make(map[string][]string)
filter["customerId"] = []string{customerId}
for _, dim := range checkDims {
filter[dim.Key] = []string{dim.Value}
}

timeGroupingInterval := metering.Hour
switch timeSpan := endTime.Unix() - startTime.Unix(); {
case timeSpan > 24*60*60:
timeGroupingInterval = metering.Month
case timeSpan > 60*60:
timeGroupingInterval = metering.Day
default:
timeGroupingInterval = metering.Hour
}

usageResult, err := m.usageClient.GetUsage(&metering.UsagePayload{
MeterApiName: cfg.Name,
Aggregation: metering.Sum,
TimeGroupingInterval: metering.Day,
TimeGroupingInterval: timeGroupingInterval,
GroupBy: []string{"customerId"},
TimeRange: timeRange,
Filter: filter,
Expand All @@ -111,16 +126,19 @@ func (m *AmberFlo) getValue(user MeterUser, meterName string) (float64, bool) {
log.Error().Err(err).Str("user", user.ID()).Msg("could not get value")
return 0, false
}
// jj, _ := json.Marshal(&usageResult)
// fmt.Println("usageResult:", string(jj))

if usageResult == nil || len(usageResult.ClientMeters) == 0 || len(usageResult.ClientMeters[0].Values) == 0 {
log.Error().Err(err).Str("user", user.ID()).Msg("could not get value; no client value meter")
return 0, false
}
cm := usageResult.ClientMeters[0].Values
cmv := cm[len(cm)-1].Value
return cmv, true

total := usageResult.ClientMeters[0].GroupValue
return total, true
}

func (m *AmberFlo) sendMeter(user MeterUser, meterName string, value float64, extraDimensions map[string]string) error {
func (m *Amberflo) sendMeter(user MeterUser, meterName string, value float64, extraDimensions Dimensions) error {
cfg, ok := m.getcfg(meterName)
if !ok {
return nil
Expand All @@ -132,24 +150,24 @@ func (m *AmberFlo) sendMeter(user MeterUser, meterName string, value float64, ex
}
uniqueId := uuid.NewRandom().String()
utcMillis := time.Now().In(time.UTC).UnixNano() / int64(time.Millisecond)
dimensions := map[string]string{}
for k, v := range cfg.Dimensions {
dimensions[k] = v
amberFloDims := map[string]string{}
for _, v := range cfg.Dimensions {
amberFloDims[v.Key] = v.Value
}
for k, v := range extraDimensions {
dimensions[k] = v
for _, v := range extraDimensions {
amberFloDims[v.Key] = v.Value
}
return m.client.Meter(&metering.MeterMessage{
MeterApiName: cfg.Name,
UniqueId: uniqueId,
MeterTimeInMillis: utcMillis,
CustomerId: customerId,
MeterValue: value,
Dimensions: dimensions,
Dimensions: amberFloDims,
})
}

func (m *AmberFlo) getCustomerID(cfg amberFloConfig, user MeterUser) (string, bool) {
func (m *Amberflo) getCustomerID(cfg amberFloConfig, user MeterUser) (string, bool) {
customerId := cfg.DefaultUser
if user != nil {
eidKey := cfg.ExternalIDKey
Expand All @@ -166,7 +184,7 @@ func (m *AmberFlo) getCustomerID(cfg amberFloConfig, user MeterUser) (string, bo
return customerId, customerId != ""
}

func (m *AmberFlo) getcfg(meterName string) (amberFloConfig, bool) {
func (m *Amberflo) getcfg(meterName string) (amberFloConfig, bool) {
cfg, ok := m.cfgs[meterName]
if !ok {
cfg = amberFloConfig{
Expand All @@ -183,41 +201,34 @@ func (m *AmberFlo) getcfg(meterName string) (amberFloConfig, bool) {
//////////

type amberFloMeter struct {
user MeterUser
dims []string
mp *AmberFlo
user MeterUser
addDims []eventAddDim
mp *Amberflo
}

func (m *amberFloMeter) Meter(meterName string, value float64, extraDimensions map[string]string) error {
var dm2 map[string]string
if len(extraDimensions) > 0 || len(m.dims) > 0 {
dm2 = map[string]string{}
}
for k, v := range extraDimensions {
dm2[k] = v
}
for i := 0; i < len(m.dims); i += 3 {
a := m.dims[i]
k := m.dims[i+1]
v := m.dims[i+2]
if a == "" || a == meterName {
dm2[k] = v
func (m *amberFloMeter) Meter(meterName string, value float64, extraDimensions Dimensions) error {
var eventDims []Dimension
// Copy in matching dimensions set through AddDimension
for _, addDim := range m.addDims {
if addDim.MeterName == meterName {
eventDims = append(eventDims, Dimension{Key: addDim.Key, Value: addDim.Value})
}
}
eventDims = append(eventDims, extraDimensions...)
log.Trace().
Str("user", m.user.ID()).
Str("meter", meterName).
Float64("meter_value", value).
Msg("meter")
return m.mp.sendMeter(m.user, meterName, value, dm2)
return m.mp.sendMeter(m.user, meterName, value, eventDims)
}

func (m *amberFloMeter) AddDimension(meterName string, key string, value string) {
m.dims = append(m.dims, meterName, key, value)
m.addDims = append(m.addDims, eventAddDim{MeterName: meterName, Key: key, Value: value})
}

func (m *amberFloMeter) GetValue(meterName string) (float64, bool) {
return m.mp.getValue(m.user, meterName)
func (m *amberFloMeter) GetValue(meterName string, startTime time.Time, endTime time.Time, dims Dimensions) (float64, bool) {
return m.mp.getValue(m.user, meterName, startTime, endTime, dims)
}

/////////
Expand Down
49 changes: 26 additions & 23 deletions internal/meters/amberflo_test.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,24 @@
package meters

import (
"errors"
"os"
"testing"
"time"

"github.com/interline-io/transitland-server/internal/testutil"
)

type amberfloTestUser struct {
name string
}

func (u *amberfloTestUser) ID() string {
return u.name
}

func (u *amberfloTestUser) GetExternalData(eid string) (string, bool) {
// must match key given in config below
if eid == "amberflo" {
return u.name, true
func TestAmberfloMeter(t *testing.T) {
mp, testConfig, err := getTestAmberfloMeter()
if err != nil {
t.Skip(err.Error())
return
}
return "", false
testMeter(t, mp, testConfig)
}

func TestAmberFloMeter(t *testing.T) {
func getTestAmberfloMeter() (*Amberflo, testMeterConfig, error) {
checkKeys := []string{
"TL_TEST_AMBERFLO_APIKEY",
"TL_TEST_AMBERFLO_METER1",
Expand All @@ -36,19 +30,28 @@ func TestAmberFloMeter(t *testing.T) {
for _, k := range checkKeys {
_, a, ok := testutil.CheckEnv(k)
if !ok {
t.Skip(a)
return
return nil, testMeterConfig{}, errors.New(a)
}
}
eidKey := "amberflo"
testConfig := testMeterConfig{
testMeter1: os.Getenv("TL_TEST_AMBERFLO_METER1"),
testMeter2: os.Getenv("TL_TEST_AMBERFLO_METER2"),
user1: &amberfloTestUser{name: os.Getenv("TL_TEST_AMBERFLO_USER1")},
user2: &amberfloTestUser{name: os.Getenv("TL_TEST_AMBERFLO_USER2")},
user3: &amberfloTestUser{name: os.Getenv("TL_TEST_AMBERFLO_USER3")},
user1: &testUser{
name: os.Getenv("TL_TEST_AMBERFLO_USER1"),
data: map[string]string{eidKey: os.Getenv("TL_TEST_AMBERFLO_USER1")},
},
user2: &testUser{
name: os.Getenv("TL_TEST_AMBERFLO_USER2"),
data: map[string]string{eidKey: os.Getenv("TL_TEST_AMBERFLO_USER2")},
},
user3: &testUser{
name: os.Getenv("TL_TEST_AMBERFLO_USER3"),
data: map[string]string{eidKey: os.Getenv("TL_TEST_AMBERFLO_USER3")},
},
}
mp := NewAmberFlo(os.Getenv("TL_TEST_AMBERFLO_APIKEY"), 1*time.Second, 1)
mp.cfgs[testConfig.testMeter1] = amberFloConfig{Name: testConfig.testMeter1, ExternalIDKey: "amberflo"}
mp.cfgs[testConfig.testMeter2] = amberFloConfig{Name: testConfig.testMeter2, ExternalIDKey: "amberflo"}
testMeter(t, mp, testConfig)
mp := NewAmberflo(os.Getenv("TL_TEST_AMBERFLO_APIKEY"), 1*time.Second, 1)
mp.cfgs[testConfig.testMeter1] = amberFloConfig{Name: testConfig.testMeter1, ExternalIDKey: eidKey}
mp.cfgs[testConfig.testMeter2] = amberFloConfig{Name: testConfig.testMeter2, ExternalIDKey: eidKey}
return mp, testConfig, nil
}
Loading

0 comments on commit 223339b

Please sign in to comment.