Skip to content

Commit

Permalink
feat: only syncronize lines that have changed (#2145)
Browse files Browse the repository at this point in the history
  • Loading branch information
turip authored Jan 24, 2025
1 parent fac89e6 commit 1481fff
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
40 changes: 33 additions & 7 deletions openmeter/billing/worker/subscription/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,13 @@ func (h *Handler) inScopeLinePatches(existingLine *billing.Line, expectedLine *b
return nil, nil
}

mergedLine, wasChange := h.mergeChangesFromLine(h.cloneLineForUpsert(existingLine), expectedLine)
if !wasChange {
return nil, nil
}

return []linePatch{
patchFromLine(patchOpUpdate, h.mergeChangesFromLine(h.cloneLineForUpsert(existingLine), expectedLine)),
patchFromLine(patchOpUpdate, mergedLine),
}, nil
}

Expand All @@ -521,6 +526,11 @@ func (h *Handler) inScopeLinePatches(existingLine *billing.Line, expectedLine *b
// TODO[later]: When we implement progressive billing based pro-rating, we need to support adjusting flat fee
// segments here.

if existingLine.Period.End.Equal(expectedLine.Period.End) {
// The line is already in the expected state, so we can safely return here
return nil, nil
}

patches := []linePatch{}

switch {
Expand Down Expand Up @@ -610,20 +620,36 @@ func (h *Handler) inScopeLinePatches(existingLine *billing.Line, expectedLine *b
return nil, fmt.Errorf("could not handle line update [lineID=%s, status=%s]", existingLine.ID, existingLine.Status)
}

func (h *Handler) mergeChangesFromLine(existingLine *billing.Line, expectedLine *billing.Line) *billing.Line {
type typeWithEqual[T any] interface {
Equal(T) bool
}

func setIfDoesNotEqual[T typeWithEqual[T]](existing *T, expected T, wasChange *bool) {
if !(*existing).Equal(expected) {
*existing = expected
*wasChange = true
}
}

func (h *Handler) mergeChangesFromLine(existingLine *billing.Line, expectedLine *billing.Line) (*billing.Line, bool) {
// We assume that only the period can change, maybe some pricing data due to prorating (for flat lines)

existingLine.Period = expectedLine.Period
wasChange := false

setIfDoesNotEqual(&existingLine.Period, expectedLine.Period, &wasChange)
setIfDoesNotEqual(&existingLine.InvoiceAt, expectedLine.InvoiceAt, &wasChange)

existingLine.InvoiceAt = expectedLine.InvoiceAt
existingLine.DeletedAt = nil
if existingLine.DeletedAt != nil {
existingLine.DeletedAt = nil
wasChange = true
}

// Let's handle the flat fee prorating
if existingLine.Type == billing.InvoiceLineTypeFee {
existingLine.FlatFee.PerUnitAmount = expectedLine.FlatFee.PerUnitAmount
setIfDoesNotEqual(&existingLine.FlatFee.PerUnitAmount, expectedLine.FlatFee.PerUnitAmount, &wasChange)
}

return existingLine
return existingLine, wasChange
}

func (h *Handler) updateMutableInvoice(ctx context.Context, invoice billing.Invoice, patches []linePatch) error {
Expand Down
3 changes: 2 additions & 1 deletion openmeter/billing/worker/subscription/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ func (s *SubscriptionHandlerTestSuite) TestSubscriptionHappyPath() {

// then there should be a gathering invoice
invoice := s.gatheringInvoice(ctx, namespace, s.Customer.ID)
invoiceUpdatedAt := invoice.UpdatedAt

s.Len(invoice.Lines.OrEmpty(), 1)

Expand All @@ -385,7 +386,7 @@ func (s *SubscriptionHandlerTestSuite) TestSubscriptionHappyPath() {

gatheringLine := gatheringInvoice.Lines.OrEmpty()[0]

// TODO[OM-1039]: the invoice's updated at gets updated even if the invoice is not changed
s.Equal(invoiceUpdatedAt, gatheringInvoice.UpdatedAt)
s.Equal(billing.InvoiceStatusGathering, gatheringInvoice.Status)
s.Equal(line.UpdatedAt, gatheringLine.UpdatedAt)
})
Expand Down

0 comments on commit 1481fff

Please sign in to comment.