Skip to content

Commit

Permalink
Merge pull request #1400 from openmeterio/feat/fix-balance-worker-errors
Browse files Browse the repository at this point in the history
Feat/fix balance worker errors
  • Loading branch information
turip authored Aug 21, 2024
2 parents 28ec3a0 + 5115e6a commit b3483c3
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
4 changes: 4 additions & 0 deletions internal/entitlement/adapter/entitlement.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ func (a *entitlementDBAdapter) ListEntitlements(ctx context.Context, params enti
})...))
}

if len(params.IDs) > 0 {
query = query.Where(db_entitlement.IDIn(params.IDs...))
}

if len(params.FeatureIDsOrKeys) > 0 {
var ep predicate.Entitlement
for i, idOrKey := range params.FeatureIDsOrKeys {
Expand Down
36 changes: 29 additions & 7 deletions internal/entitlement/balanceworker/entitlementhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,39 @@ func (w *Worker) handleEntitlementUpdateEvent(ctx context.Context, entitlementID
}

func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID NamespacedID, source string, calculatedAt time.Time) (marshaler.Event, error) {
entitlement, err := w.entitlement.Entitlement.GetEntitlement(ctx, entitlementID.Namespace, entitlementID.ID)
entitlements, err := w.entitlement.Entitlement.ListEntitlements(ctx, entitlement.ListEntitlementsParams{
Namespaces: []string{entitlementID.Namespace},
IDs: []string{entitlementID.ID},
IncludeDeleted: true,
})
if err != nil {
return nil, fmt.Errorf("failed to get entitlement: %w", err)
}

feature, err := w.entitlement.Feature.GetFeature(ctx, entitlementID.Namespace, entitlement.FeatureID, productcatalog.IncludeArchivedFeatureTrue)
if len(entitlements.Items) == 0 {
return nil, fmt.Errorf("entitlement not found: %s", entitlementID.ID)
}

if len(entitlements.Items) > 1 {
return nil, fmt.Errorf("multiple entitlements found: %s", entitlementID.ID)
}

entitlementEntity := &entitlements.Items[0]
if entitlementEntity.DeletedAt != nil {
// entitlement got deleted while processing changes => let's create a delete event so that we are not working
// on entitlement updates that are not relevant anymore
return w.handleEntitlementDeleteEvent(ctx, entitlement.EntitlementDeletedEvent{
Entitlement: *entitlementEntity,
Namespace: models.NamespaceID{ID: entitlementID.Namespace},
})
}

feature, err := w.entitlement.Feature.GetFeature(ctx, entitlementID.Namespace, entitlementEntity.FeatureID, productcatalog.IncludeArchivedFeatureTrue)
if err != nil {
return nil, fmt.Errorf("failed to get feature: %w", err)
}

value, err := w.entitlement.Entitlement.GetEntitlementValue(ctx, entitlementID.Namespace, entitlement.SubjectKey, entitlement.ID, calculatedAt)
value, err := w.entitlement.Entitlement.GetEntitlementValue(ctx, entitlementID.Namespace, entitlementEntity.SubjectKey, entitlementEntity.ID, calculatedAt)
if err != nil {
return nil, fmt.Errorf("failed to get entitlement value: %w", err)
}
Expand All @@ -104,10 +126,10 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac
}

subject := models.Subject{
Key: entitlement.SubjectKey,
Key: entitlementEntity.SubjectKey,
}
if w.opts.SubjectResolver != nil {
subject, err = w.opts.SubjectResolver.GetSubjectByKey(ctx, entitlementID.Namespace, entitlement.SubjectKey)
subject, err = w.opts.SubjectResolver.GetSubjectByKey(ctx, entitlementID.Namespace, entitlementEntity.SubjectKey)
if err != nil {
return nil, fmt.Errorf("failed to get subject ID: %w", err)
}
Expand All @@ -116,7 +138,7 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac
event := marshaler.WithSource(
source,
snapshot.SnapshotEvent{
Entitlement: *entitlement,
Entitlement: *entitlementEntity,
Namespace: models.NamespaceID{
ID: entitlementID.Namespace,
},
Expand All @@ -127,7 +149,7 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac
CalculatedAt: &calculatedAt,

Value: convert.ToPointer((snapshot.EntitlementValue)(mappedValues)),
CurrentUsagePeriod: entitlement.CurrentUsagePeriod,
CurrentUsagePeriod: entitlementEntity.CurrentUsagePeriod,
},
)

Expand Down
1 change: 1 addition & 0 deletions internal/entitlement/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (o ListEntitlementsOrderBy) StrValues() []string {
}

type ListEntitlementsParams struct {
IDs []string
Namespaces []string
SubjectKeys []string
FeatureIDs []string
Expand Down

0 comments on commit b3483c3

Please sign in to comment.