diff --git a/openmeter/credit/balance.go b/openmeter/credit/balance.go index 502cc75cf..d1bf9a62a 100644 --- a/openmeter/credit/balance.go +++ b/openmeter/credit/balance.go @@ -147,33 +147,78 @@ func (m *connector) GetBalanceHistoryOfOwner(ctx context.Context, owner grant.Na periods := SortedPeriodsFromDedupedTimes(times) historySegments := make([]engine.GrantBurnDownHistorySegment, 0, len(periods)) - // collect al history segments through all periods + // For each period we'll have to calculate separately as we cannot calculate across resets. + // For each period, we will: + // 1. Find the last valid snapshot before the period start (might be at or before the period start) + // 2. Calculate the balance at the period start + // 3. Calculate the balance through the period for _, period := range periods { - // get last valid grantbalances at start of period (eq balance at start of period) - balance, err := m.getLastValidBalanceSnapshotForOwnerAt(ctx, owner, period.From) + // Get last valid BalanceSnapshot before (or at) the period start + snap, err := m.getLastValidBalanceSnapshotForOwnerAt(ctx, owner, period.From) if err != nil { return engine.GrantBurnDownHistory{}, err } - if period.From.Before(balance.At) { + if period.From.Before(snap.At) { // This is an inconsistency check. It can only happen if we lost our snapshot for the reset. // // The engine doesn't manage rollovers at usage reset so it cannot be used to calculate GrantBurnDown across resets. // FIXME: this is theoretically possible, we need to handle it, add capability to ledger. - return engine.GrantBurnDownHistory{}, fmt.Errorf("current period start %s is before last valid balance snapshot at %s, no snapshot was created for reset", period.From, balance.At) + return engine.GrantBurnDownHistory{}, fmt.Errorf("current period start %s is before last valid balance snapshot at %s, no snapshot was created for reset", period.From, snap.At) } + // First, let's calculate the balance from the last snapshot until the start of the period + // get all relevant grants - grants, err := m.grantRepo.ListActiveGrantsBetween(ctx, owner, period.From, period.To) + grants, err := m.grantRepo.ListActiveGrantsBetween(ctx, owner, snap.At, period.From) + if err != nil { + return engine.GrantBurnDownHistory{}, err + } + // These grants might not be present in the starting balance so lets fill them // This is only possible in case the grant becomes active exactly at the start of the current period - m.populateBalanceSnapshotWithMissingGrantsActiveAt(&balance, grants, period.From) + m.populateBalanceSnapshotWithMissingGrantsActiveAt(&snap, grants, snap.At) + + periodFromSnapshotToPeriodStart := recurrence.Period{ + From: snap.At, + To: period.From, + } + + eng, err := m.buildEngineForOwner(ctx, owner, periodFromSnapshotToPeriodStart) + if err != nil { + return engine.GrantBurnDownHistory{}, err + } + + balances, overage, _, err := eng.Run( + ctx, + grants, + snap.Balances, + snap.Overage, + periodFromSnapshotToPeriodStart, + ) + if err != nil { + return engine.GrantBurnDownHistory{}, fmt.Errorf("failed to calculate balance for owner %s at %s: %w", owner.ID, period.From, err) + } + + fakeSnapshotForPeriodStart := balance.Snapshot{ + Balances: balances, + Overage: overage, + At: period.From, + } + + // Second, lets calculate the balance for the period + // get all relevant grants + grants, err = m.grantRepo.ListActiveGrantsBetween(ctx, owner, period.From, period.To) if err != nil { return engine.GrantBurnDownHistory{}, err } - eng, err := m.buildEngineForOwner(ctx, owner, period) + // These grants might not be present in the starting balance so lets fill them + // This is only possible in case the grant becomes active exactly at the start of the current period + m.populateBalanceSnapshotWithMissingGrantsActiveAt(&fakeSnapshotForPeriodStart, grants, period.From) + + eng, err = m.buildEngineForOwner(ctx, owner, period) if err != nil { return engine.GrantBurnDownHistory{}, err } @@ -181,8 +226,8 @@ func (m *connector) GetBalanceHistoryOfOwner(ctx context.Context, owner grant.Na _, _, segments, err := eng.Run( ctx, grants, - balance.Balances, - balance.Overage, + fakeSnapshotForPeriodStart.Balances, + fakeSnapshotForPeriodStart.Overage, period, ) if err != nil { diff --git a/openmeter/entitlement/metered/balance.go b/openmeter/entitlement/metered/balance.go index 607e2955b..6c64fd0b8 100644 --- a/openmeter/entitlement/metered/balance.go +++ b/openmeter/entitlement/metered/balance.go @@ -10,6 +10,7 @@ import ( "github.com/openmeterio/openmeter/openmeter/credit/engine" "github.com/openmeterio/openmeter/openmeter/credit/grant" "github.com/openmeterio/openmeter/openmeter/entitlement" + "github.com/openmeterio/openmeter/openmeter/streaming" "github.com/openmeterio/openmeter/pkg/clock" "github.com/openmeterio/openmeter/pkg/convert" "github.com/openmeterio/openmeter/pkg/models" @@ -151,15 +152,20 @@ func (e *connector) GetEntitlementBalanceHistory(ctx context.Context, entitlemen return nil, engine.GrantBurnDownHistory{}, fmt.Errorf("failed to get balance history: %w", err) } - // 2. and we get the windowed usage data - meterQuery := ownerMeter.DefaultParams - meterQuery.FilterSubject = []string{ownerMeter.SubjectKey} - meterQuery.From = params.From - meterQuery.To = params.To - meterQuery.WindowSize = convert.ToPointer(models.WindowSize(params.WindowSize)) - meterQuery.WindowTimeZone = ¶ms.WindowTimeZone + getBaseQuery := func() streaming.QueryParams { + base := ownerMeter.DefaultParams + + base.FilterSubject = []string{ownerMeter.SubjectKey} + base.From = params.From + base.To = params.To + base.WindowSize = convert.ToPointer(models.WindowSize(params.WindowSize)) + base.WindowTimeZone = ¶ms.WindowTimeZone + + return base + } - meterRows, err := e.streamingConnector.QueryMeter(ctx, owner.Namespace, ownerMeter.Meter, meterQuery) + // 2. and we get the windowed usage data + meterRows, err := e.streamingConnector.QueryMeter(ctx, owner.Namespace, ownerMeter.Meter, getBaseQuery()) if err != nil { return nil, engine.GrantBurnDownHistory{}, fmt.Errorf("failed to query meter: %w", err) } @@ -167,8 +173,7 @@ func (e *connector) GetEntitlementBalanceHistory(ctx context.Context, entitlemen // If we get 0 rows that means the windowsize is larger than the queried period. // In this case we simply query for the entire period. if len(meterRows) == 0 { - nonWindowedParams := meterQuery - nonWindowedParams.FilterSubject = []string{ownerMeter.SubjectKey} + nonWindowedParams := getBaseQuery() nonWindowedParams.WindowSize = nil nonWindowedParams.WindowTimeZone = nil meterRows, err = e.streamingConnector.QueryMeter(ctx, owner.Namespace, ownerMeter.Meter, nonWindowedParams) @@ -179,13 +184,14 @@ func (e *connector) GetEntitlementBalanceHistory(ctx context.Context, entitlemen // 3. and then we merge the two - // convert history segments to list of point in time balances + // convert history segments to list of point-in-time balances segments := burndownHistory.Segments() if len(segments) == 0 { return nil, engine.GrantBurnDownHistory{}, fmt.Errorf("returned history is empty") } + // We'll use these balances to continuously deduct usage from timestampedBalances := make([]struct { balance float64 overage float64 @@ -203,23 +209,58 @@ func (e *connector) GetEntitlementBalanceHistory(ctx context.Context, entitlemen }) } + visited := make(map[int]bool) + // we'll create a window for each row (same windowsize) windows := make([]EntitlementBalanceHistoryWindow, 0, len(meterRows)) for _, row := range meterRows { - // find the last timestamped balance that was not later than the row - // This is not effective on a lot of rows - tsBalance, ok := slicesx.First(timestampedBalances, func(tsb struct { + // Lets find the last timestamped balance that was no later than the row + tsBalance, idx, ok := slicesx.Last(timestampedBalances, func(tsb struct { balance float64 overage float64 timestamp time.Time }, ) bool { return tsb.timestamp.Before(row.WindowStart) || tsb.timestamp.Equal(row.WindowStart) - }, true) + }) if !ok { return nil, engine.GrantBurnDownHistory{}, fmt.Errorf("no balance found for time %s", row.WindowStart.Format(time.RFC3339)) } + // If this is the first time we're using this `tsBalance`, we need to account for the usage between it's time and the row's time + if !visited[idx] { + // We need to query the usage between the two timestamps + params := getBaseQuery() + params.From = &tsBalance.timestamp + params.To = &row.WindowStart + params.WindowSize = nil + params.WindowTimeZone = nil + + rows, err := e.streamingConnector.QueryMeter(ctx, owner.Namespace, ownerMeter.Meter, params) + if err != nil { + return nil, engine.GrantBurnDownHistory{}, fmt.Errorf("failed to query meter: %w", err) + } + + // We should have 1 row + if len(rows) != 1 { + return nil, engine.GrantBurnDownHistory{}, fmt.Errorf("expected 1 row, got %d", len(rows)) + } + + // deduct balance and increase overage if needed + usage := rows[0].Value + + balanceAtEnd := math.Max(0, tsBalance.balance-usage) + deductedUsage := tsBalance.balance - balanceAtEnd + overage := usage - deductedUsage + tsBalance.overage + + // update + tsBalance.balance = balanceAtEnd + tsBalance.overage = overage + } + + // Let's mark this balance as visited + visited[idx] = true + window := EntitlementBalanceHistoryWindow{ From: row.WindowStart.In(¶ms.WindowTimeZone), To: row.WindowEnd.In(¶ms.WindowTimeZone), diff --git a/openmeter/entitlement/metered/balance_test.go b/openmeter/entitlement/metered/balance_test.go index 530f0d154..18d225663 100644 --- a/openmeter/entitlement/metered/balance_test.go +++ b/openmeter/entitlement/metered/balance_test.go @@ -5,7 +5,9 @@ import ( "testing" "time" + "github.com/samber/lo" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/openmeterio/openmeter/openmeter/credit/balance" "github.com/openmeterio/openmeter/openmeter/credit/grant" @@ -505,43 +507,88 @@ func TestGetEntitlementHistory(t *testing.T) { }) assert.NoError(t, err) - windowedHistory, burndownHistory, err := connector.GetEntitlementBalanceHistory(ctx, models.NamespacedID{Namespace: namespace, ID: ent.ID}, meteredentitlement.BalanceHistoryParams{ - From: &startTime, - To: &queryTime, - WindowTimeZone: *time.UTC, - WindowSize: meteredentitlement.WindowSizeHour, + t.Run("Should return correct value for the entire period", func(t *testing.T) { + windowedHistory, burndownHistory, err := connector.GetEntitlementBalanceHistory(ctx, models.NamespacedID{Namespace: namespace, ID: ent.ID}, meteredentitlement.BalanceHistoryParams{ + From: &startTime, + To: &queryTime, + WindowTimeZone: *time.UTC, + WindowSize: meteredentitlement.WindowSizeHour, + }) + require.NoError(t, err) + + assert.Len(t, windowedHistory, 12) + + // deps.streaming.AddSimpleEvent(meterSlug, 100, startTime.Add(time.Minute)) + assert.Equal(t, 100.0, windowedHistory[0].UsageInPeriod) + assert.Equal(t, 10000.0, windowedHistory[0].BalanceAtStart) + assert.Equal(t, 9900.0, windowedHistory[1].BalanceAtStart) + assert.Zero(t, startTime.Add(time.Hour).Compare(windowedHistory[1].From)) + // deps.streaming.AddSimpleEvent(meterSlug, 100, startTime.Add(time.Hour*2).Add(time.Minute)) + assert.Equal(t, 100.0, windowedHistory[2].UsageInPeriod) + assert.Equal(t, 9900.0, windowedHistory[2].BalanceAtStart) + // deps.streaming.AddSimpleEvent(meterSlug, 100, startTime.Add(time.Hour*3).Add(time.Minute)) + assert.Equal(t, 100.0, windowedHistory[3].UsageInPeriod) + assert.Equal(t, 19800.0, windowedHistory[3].BalanceAtStart) + assert.Equal(t, 19700.0, windowedHistory[4].BalanceAtStart) + // deps.streaming.AddSimpleEvent(meterSlug, 100, startTime.Add(time.Hour*5).Add(time.Minute)) + assert.Equal(t, 100.0, windowedHistory[5].UsageInPeriod) + assert.Equal(t, 19700.0, windowedHistory[5].BalanceAtStart) // even though EffectiveAt: startTime.Add(time.Hour * 5).Add(time.Minute * 30) grant happens here, it is only recognized at the next window + assert.Equal(t, 29600.0, windowedHistory[6].BalanceAtStart) + assert.Equal(t, 29600.0, windowedHistory[7].BalanceAtStart) + // deps.streaming.AddSimpleEvent(meterSlug, 1100, startTime.Add(time.Hour*8).Add(time.Minute)) + assert.Equal(t, 1100.0, windowedHistory[8].UsageInPeriod) + assert.Equal(t, 29600.0, windowedHistory[8].BalanceAtStart) + assert.Equal(t, 28500.0, windowedHistory[9].BalanceAtStart) + // deps.streaming.AddSimpleEvent(meterSlug, 100, queryTime.Add(-time.Second)) + assert.Equal(t, 100.0, windowedHistory[11].UsageInPeriod) + assert.Equal(t, 28500.0, windowedHistory[11].BalanceAtStart) + + // check returned burndownhistory + segments := burndownHistory.Segments() + assert.Len(t, segments, 3) }) - assert.NoError(t, err) - - assert.Len(t, windowedHistory, 12) - - // deps.streaming.AddSimpleEvent(meterSlug, 100, startTime.Add(time.Minute)) - assert.Equal(t, 100.0, windowedHistory[0].UsageInPeriod) - assert.Equal(t, 10000.0, windowedHistory[0].BalanceAtStart) - assert.Equal(t, 9900.0, windowedHistory[1].BalanceAtStart) - // deps.streaming.AddSimpleEvent(meterSlug, 100, startTime.Add(time.Hour*2).Add(time.Minute)) - assert.Equal(t, 100.0, windowedHistory[2].UsageInPeriod) - assert.Equal(t, 9900.0, windowedHistory[2].BalanceAtStart) - // deps.streaming.AddSimpleEvent(meterSlug, 100, startTime.Add(time.Hour*3).Add(time.Minute)) - assert.Equal(t, 100.0, windowedHistory[3].UsageInPeriod) - assert.Equal(t, 19800.0, windowedHistory[3].BalanceAtStart) - assert.Equal(t, 19700.0, windowedHistory[4].BalanceAtStart) - // deps.streaming.AddSimpleEvent(meterSlug, 100, startTime.Add(time.Hour*5).Add(time.Minute)) - assert.Equal(t, 100.0, windowedHistory[5].UsageInPeriod) - assert.Equal(t, 19700.0, windowedHistory[5].BalanceAtStart) // even though EffectiveAt: startTime.Add(time.Hour * 5).Add(time.Minute * 30) grant happens here, it is only recognized at the next window - assert.Equal(t, 29600.0, windowedHistory[6].BalanceAtStart) - assert.Equal(t, 29600.0, windowedHistory[7].BalanceAtStart) - // deps.streaming.AddSimpleEvent(meterSlug, 1100, startTime.Add(time.Hour*8).Add(time.Minute)) - assert.Equal(t, 1100.0, windowedHistory[8].UsageInPeriod) - assert.Equal(t, 29600.0, windowedHistory[8].BalanceAtStart) - assert.Equal(t, 28500.0, windowedHistory[9].BalanceAtStart) - // deps.streaming.AddSimpleEvent(meterSlug, 100, queryTime.Add(-time.Second)) - assert.Equal(t, 100.0, windowedHistory[11].UsageInPeriod) - assert.Equal(t, 28500.0, windowedHistory[11].BalanceAtStart) - // check returned burndownhistory - segments := burndownHistory.Segments() - assert.Len(t, segments, 3) + t.Run("Should return correct value if the queried period doesn't coincide with history breakpoints", func(t *testing.T) { + windowedHistory, burndownHistory, err := connector.GetEntitlementBalanceHistory(ctx, models.NamespacedID{Namespace: namespace, ID: ent.ID}, meteredentitlement.BalanceHistoryParams{ + From: lo.ToPtr(startTime.Add(time.Hour)), + To: &queryTime, + WindowTimeZone: *time.UTC, + WindowSize: meteredentitlement.WindowSizeHour, + }) + assert.NoError(t, err) + + // check returned burndownhistory + segments := burndownHistory.Segments() + assert.Len(t, segments, 3) + + assert.Zero(t, segments[0].From.Compare(startTime.Add(time.Hour))) + assert.Equal(t, 9900.0, segments[0].BalanceAtStart.Balance()) + + // check windowed history + assert.Len(t, windowedHistory, 11) + + assert.Zero(t, startTime.Add(time.Hour).Compare(windowedHistory[0].From)) + assert.Equal(t, 9900.0, windowedHistory[0].BalanceAtStart) + // deps.streaming.AddSimpleEvent(meterSlug, 100, startTime.Add(time.Hour*2).Add(time.Minute)) + assert.Equal(t, 100.0, windowedHistory[1].UsageInPeriod) + assert.Equal(t, 9900.0, windowedHistory[1].BalanceAtStart) + // deps.streaming.AddSimpleEvent(meterSlug, 100, startTime.Add(time.Hour*3).Add(time.Minute)) + assert.Equal(t, 100.0, windowedHistory[2].UsageInPeriod) + assert.Equal(t, 19800.0, windowedHistory[2].BalanceAtStart) + assert.Equal(t, 19700.0, windowedHistory[3].BalanceAtStart) + // deps.streaming.AddSimpleEvent(meterSlug, 100, startTime.Add(time.Hour*5).Add(time.Minute)) + assert.Equal(t, 100.0, windowedHistory[4].UsageInPeriod) + assert.Equal(t, 19700.0, windowedHistory[4].BalanceAtStart) // even though EffectiveAt: startTime.Add(time.Hour * 5).Add(time.Minute * 30) grant happens here, it is only recognized at the next window + assert.Equal(t, 29600.0, windowedHistory[5].BalanceAtStart) + assert.Equal(t, 29600.0, windowedHistory[6].BalanceAtStart) + // deps.streaming.AddSimpleEvent(meterSlug, 1100, startTime.Add(time.Hour*8).Add(time.Minute)) + assert.Equal(t, 1100.0, windowedHistory[7].UsageInPeriod) + assert.Equal(t, 29600.0, windowedHistory[7].BalanceAtStart) + assert.Equal(t, 28500.0, windowedHistory[8].BalanceAtStart) + // deps.streaming.AddSimpleEvent(meterSlug, 100, queryTime.Add(-time.Second)) + assert.Equal(t, 100.0, windowedHistory[10].UsageInPeriod) + assert.Equal(t, 28500.0, windowedHistory[10].BalanceAtStart) + }) }, }, { @@ -639,6 +686,89 @@ func TestGetEntitlementHistory(t *testing.T) { assert.Len(t, segments, 1) }, }, + { + name: "Should return history if WINDOWSIZE and entitlements events dont align", + run: func(t *testing.T, connector meteredentitlement.Connector, deps *dependencies) { + ctx := context.Background() + startTime := testutils.GetRFC3339Time(t, "2024-03-01T00:00:00Z") + + // create featute in db + feature, err := deps.featureRepo.CreateFeature(ctx, exampleFeature) + assert.NoError(t, err) + + // create entitlement in db + inp := getEntitlement(t, feature) + inp.MeasureUsageFrom = &startTime + ent, err := deps.entitlementRepo.CreateEntitlement(ctx, inp) + assert.NoError(t, err) + + // We'll query with WINDOWSIZE_DAY, so lets use 12h precision for the different events + + // grant at start + _, err = deps.grantRepo.CreateGrant(ctx, grant.RepoCreateInput{ + OwnerID: grant.Owner(ent.ID), + Namespace: namespace, + Amount: 10000, + Priority: 1, + EffectiveAt: startTime, + ExpiresAt: startTime.AddDate(0, 0, 3), + }) + assert.NoError(t, err) + + // register usage for meter & feature + deps.streamingConnector.AddSimpleEvent(meterSlug, 100, startTime.Add(time.Hour)) + + // let's do a reset + resetTime := startTime.Add(time.Hour * 12) + _, err = connector.ResetEntitlementUsage(ctx, + models.NamespacedID{Namespace: namespace, ID: ent.ID}, + meteredentitlement.ResetEntitlementUsageParams{ + At: resetTime, + RetainAnchor: true, + }, + ) + assert.NoError(t, err) + + queryTime := startTime.AddDate(0, 0, 2) + + // register usage for meter & feature + deps.streamingConnector.AddSimpleEvent(meterSlug, 500, startTime.Add(time.Hour*11)) + deps.streamingConnector.AddSimpleEvent(meterSlug, 300, startTime.Add(time.Hour*18)) + deps.streamingConnector.AddSimpleEvent(meterSlug, 1100, startTime.Add(time.Hour*25)) + + // grant after the reset + _, err = deps.grantRepo.CreateGrant(ctx, grant.RepoCreateInput{ + OwnerID: grant.Owner(ent.ID), + Namespace: namespace, + Amount: 7000, + Priority: 1, + EffectiveAt: resetTime, + ExpiresAt: startTime.AddDate(0, 0, 3), + }) + assert.NoError(t, err) + + windowedHistory, burndownHistory, err := connector.GetEntitlementBalanceHistory(ctx, models.NamespacedID{Namespace: namespace, ID: ent.ID}, meteredentitlement.BalanceHistoryParams{ + To: &queryTime, + From: lo.ToPtr(startTime), + WindowTimeZone: *time.UTC, + WindowSize: meteredentitlement.WindowSizeDay, + }) + assert.NoError(t, err) + + // check returned burndownhistory + segments := burndownHistory.Segments() + assert.Len(t, segments, 2) + + assert.Len(t, windowedHistory, 2) + + // First Day + assert.Equal(t, 900.0, windowedHistory[0].UsageInPeriod) + assert.Equal(t, 10000.0, windowedHistory[0].BalanceAtStart) + // Second Day + assert.Equal(t, 1100.0, windowedHistory[1].UsageInPeriod) + assert.Equal(t, 6700.0, windowedHistory[1].BalanceAtStart) + }, + }, } for _, tc := range tt { diff --git a/pkg/slicesx/first.go b/pkg/slicesx/first.go deleted file mode 100644 index c804a2ccd..000000000 --- a/pkg/slicesx/first.go +++ /dev/null @@ -1,26 +0,0 @@ -package slicesx - -// returns the first element in the slice where the predicate returns true -// if second argument is true the returns the last not the first -func First[T any](s []T, f func(T) bool, last bool) (*T, bool) { - if s == nil { - return nil, false - } - - if last { - for i := len(s) - 1; i >= 0; i-- { - if f(s[i]) { - return &s[i], true - } - } - return nil, false - } - - for _, v := range s { - if f(v) { - return &v, true - } - } - - return nil, false -} diff --git a/pkg/slicesx/last.go b/pkg/slicesx/last.go new file mode 100644 index 000000000..0faf5d5ad --- /dev/null +++ b/pkg/slicesx/last.go @@ -0,0 +1,15 @@ +package slicesx + +// Returns the last element in the slice where the predicate returns true +func Last[T any](s []T, f func(T) bool) (*T, int, bool) { + if s == nil { + return nil, -1, false + } + + for i := len(s) - 1; i >= 0; i-- { + if f(s[i]) { + return &s[i], i, true + } + } + return nil, -1, false +}