Skip to content

Commit

Permalink
Fix entitlement history bugs (#2160)
Browse files Browse the repository at this point in the history
  • Loading branch information
GAlexIHU authored Jan 29, 2025
1 parent 15e0c35 commit c0f107d
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 86 deletions.
65 changes: 55 additions & 10 deletions openmeter/credit/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,42 +147,87 @@ func (m *connector) GetBalanceHistoryOfOwner(ctx context.Context, owner grant.Na
periods := SortedPeriodsFromDedupedTimes(times)
historySegments := make([]engine.GrantBurnDownHistorySegment, 0, len(periods))

// collect al history segments through all periods
// For each period we'll have to calculate separately as we cannot calculate across resets.
// For each period, we will:
// 1. Find the last valid snapshot before the period start (might be at or before the period start)
// 2. Calculate the balance at the period start
// 3. Calculate the balance through the period
for _, period := range periods {
// get last valid grantbalances at start of period (eq balance at start of period)
balance, err := m.getLastValidBalanceSnapshotForOwnerAt(ctx, owner, period.From)
// Get last valid BalanceSnapshot before (or at) the period start
snap, err := m.getLastValidBalanceSnapshotForOwnerAt(ctx, owner, period.From)
if err != nil {
return engine.GrantBurnDownHistory{}, err
}

if period.From.Before(balance.At) {
if period.From.Before(snap.At) {
// This is an inconsistency check. It can only happen if we lost our snapshot for the reset.
//
// The engine doesn't manage rollovers at usage reset so it cannot be used to calculate GrantBurnDown across resets.
// FIXME: this is theoretically possible, we need to handle it, add capability to ledger.
return engine.GrantBurnDownHistory{}, fmt.Errorf("current period start %s is before last valid balance snapshot at %s, no snapshot was created for reset", period.From, balance.At)
return engine.GrantBurnDownHistory{}, fmt.Errorf("current period start %s is before last valid balance snapshot at %s, no snapshot was created for reset", period.From, snap.At)
}

// First, let's calculate the balance from the last snapshot until the start of the period

// get all relevant grants
grants, err := m.grantRepo.ListActiveGrantsBetween(ctx, owner, period.From, period.To)
grants, err := m.grantRepo.ListActiveGrantsBetween(ctx, owner, snap.At, period.From)
if err != nil {
return engine.GrantBurnDownHistory{}, err
}

// These grants might not be present in the starting balance so lets fill them
// This is only possible in case the grant becomes active exactly at the start of the current period
m.populateBalanceSnapshotWithMissingGrantsActiveAt(&balance, grants, period.From)
m.populateBalanceSnapshotWithMissingGrantsActiveAt(&snap, grants, snap.At)

periodFromSnapshotToPeriodStart := recurrence.Period{
From: snap.At,
To: period.From,
}

eng, err := m.buildEngineForOwner(ctx, owner, periodFromSnapshotToPeriodStart)
if err != nil {
return engine.GrantBurnDownHistory{}, err
}

balances, overage, _, err := eng.Run(
ctx,
grants,
snap.Balances,
snap.Overage,
periodFromSnapshotToPeriodStart,
)
if err != nil {
return engine.GrantBurnDownHistory{}, fmt.Errorf("failed to calculate balance for owner %s at %s: %w", owner.ID, period.From, err)
}

fakeSnapshotForPeriodStart := balance.Snapshot{
Balances: balances,
Overage: overage,
At: period.From,
}

// Second, lets calculate the balance for the period

// get all relevant grants
grants, err = m.grantRepo.ListActiveGrantsBetween(ctx, owner, period.From, period.To)
if err != nil {
return engine.GrantBurnDownHistory{}, err
}

eng, err := m.buildEngineForOwner(ctx, owner, period)
// These grants might not be present in the starting balance so lets fill them
// This is only possible in case the grant becomes active exactly at the start of the current period
m.populateBalanceSnapshotWithMissingGrantsActiveAt(&fakeSnapshotForPeriodStart, grants, period.From)

eng, err = m.buildEngineForOwner(ctx, owner, period)
if err != nil {
return engine.GrantBurnDownHistory{}, err
}

_, _, segments, err := eng.Run(
ctx,
grants,
balance.Balances,
balance.Overage,
fakeSnapshotForPeriodStart.Balances,
fakeSnapshotForPeriodStart.Overage,
period,
)
if err != nil {
Expand Down
71 changes: 56 additions & 15 deletions openmeter/entitlement/metered/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/openmeterio/openmeter/openmeter/credit/engine"
"github.com/openmeterio/openmeter/openmeter/credit/grant"
"github.com/openmeterio/openmeter/openmeter/entitlement"
"github.com/openmeterio/openmeter/openmeter/streaming"
"github.com/openmeterio/openmeter/pkg/clock"
"github.com/openmeterio/openmeter/pkg/convert"
"github.com/openmeterio/openmeter/pkg/models"
Expand Down Expand Up @@ -151,24 +152,28 @@ func (e *connector) GetEntitlementBalanceHistory(ctx context.Context, entitlemen
return nil, engine.GrantBurnDownHistory{}, fmt.Errorf("failed to get balance history: %w", err)
}

// 2. and we get the windowed usage data
meterQuery := ownerMeter.DefaultParams
meterQuery.FilterSubject = []string{ownerMeter.SubjectKey}
meterQuery.From = params.From
meterQuery.To = params.To
meterQuery.WindowSize = convert.ToPointer(models.WindowSize(params.WindowSize))
meterQuery.WindowTimeZone = &params.WindowTimeZone
getBaseQuery := func() streaming.QueryParams {
base := ownerMeter.DefaultParams

base.FilterSubject = []string{ownerMeter.SubjectKey}
base.From = params.From
base.To = params.To
base.WindowSize = convert.ToPointer(models.WindowSize(params.WindowSize))
base.WindowTimeZone = &params.WindowTimeZone

return base
}

meterRows, err := e.streamingConnector.QueryMeter(ctx, owner.Namespace, ownerMeter.Meter, meterQuery)
// 2. and we get the windowed usage data
meterRows, err := e.streamingConnector.QueryMeter(ctx, owner.Namespace, ownerMeter.Meter, getBaseQuery())
if err != nil {
return nil, engine.GrantBurnDownHistory{}, fmt.Errorf("failed to query meter: %w", err)
}

// If we get 0 rows that means the windowsize is larger than the queried period.
// In this case we simply query for the entire period.
if len(meterRows) == 0 {
nonWindowedParams := meterQuery
nonWindowedParams.FilterSubject = []string{ownerMeter.SubjectKey}
nonWindowedParams := getBaseQuery()
nonWindowedParams.WindowSize = nil
nonWindowedParams.WindowTimeZone = nil
meterRows, err = e.streamingConnector.QueryMeter(ctx, owner.Namespace, ownerMeter.Meter, nonWindowedParams)
Expand All @@ -179,13 +184,14 @@ func (e *connector) GetEntitlementBalanceHistory(ctx context.Context, entitlemen

// 3. and then we merge the two

// convert history segments to list of point in time balances
// convert history segments to list of point-in-time balances
segments := burndownHistory.Segments()

if len(segments) == 0 {
return nil, engine.GrantBurnDownHistory{}, fmt.Errorf("returned history is empty")
}

// We'll use these balances to continuously deduct usage from
timestampedBalances := make([]struct {
balance float64
overage float64
Expand All @@ -203,23 +209,58 @@ func (e *connector) GetEntitlementBalanceHistory(ctx context.Context, entitlemen
})
}

visited := make(map[int]bool)

// we'll create a window for each row (same windowsize)
windows := make([]EntitlementBalanceHistoryWindow, 0, len(meterRows))
for _, row := range meterRows {
// find the last timestamped balance that was not later than the row
// This is not effective on a lot of rows
tsBalance, ok := slicesx.First(timestampedBalances, func(tsb struct {
// Lets find the last timestamped balance that was no later than the row
tsBalance, idx, ok := slicesx.Last(timestampedBalances, func(tsb struct {
balance float64
overage float64
timestamp time.Time
},
) bool {
return tsb.timestamp.Before(row.WindowStart) || tsb.timestamp.Equal(row.WindowStart)
}, true)
})
if !ok {
return nil, engine.GrantBurnDownHistory{}, fmt.Errorf("no balance found for time %s", row.WindowStart.Format(time.RFC3339))
}

// If this is the first time we're using this `tsBalance`, we need to account for the usage between it's time and the row's time
if !visited[idx] {
// We need to query the usage between the two timestamps
params := getBaseQuery()
params.From = &tsBalance.timestamp
params.To = &row.WindowStart
params.WindowSize = nil
params.WindowTimeZone = nil

rows, err := e.streamingConnector.QueryMeter(ctx, owner.Namespace, ownerMeter.Meter, params)
if err != nil {
return nil, engine.GrantBurnDownHistory{}, fmt.Errorf("failed to query meter: %w", err)
}

// We should have 1 row
if len(rows) != 1 {
return nil, engine.GrantBurnDownHistory{}, fmt.Errorf("expected 1 row, got %d", len(rows))
}

// deduct balance and increase overage if needed
usage := rows[0].Value

balanceAtEnd := math.Max(0, tsBalance.balance-usage)
deductedUsage := tsBalance.balance - balanceAtEnd
overage := usage - deductedUsage + tsBalance.overage

// update
tsBalance.balance = balanceAtEnd
tsBalance.overage = overage
}

// Let's mark this balance as visited
visited[idx] = true

window := EntitlementBalanceHistoryWindow{
From: row.WindowStart.In(&params.WindowTimeZone),
To: row.WindowEnd.In(&params.WindowTimeZone),
Expand Down
Loading

0 comments on commit c0f107d

Please sign in to comment.