Skip to content

Commit

Permalink
expiring_commitments: change discovery task to read only.
Browse files Browse the repository at this point in the history
Add short-term notified query into process task
Fix: Bug that already notified commitments would be notified on another run
Add a unit test for the case above
  • Loading branch information
VoigtS committed Feb 24, 2025
1 parent 53d0ecc commit 59d1d80
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 30 deletions.
44 changes: 19 additions & 25 deletions internal/collector/expiring_commitments.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (

const (
expiringCommitmentsNoticePeriod = 28 * 24 * time.Hour // 4 weeks
nextSumbissionInteval = 3 * time.Minute
nextSumbissionInteval = 2 * time.Minute
)

// Add commitments that are about to expire within the next month into the mail queue.
Expand All @@ -55,21 +55,22 @@ func (c *Collector) ExpiringCommitmentNotificationJob(registerer prometheus.Regi
}

type ExpiringCommitments struct {
Notifications map[db.ProjectID][]datamodel.CommitmentInfo
NextSubmission time.Time
Notifications map[db.ProjectID][]datamodel.CommitmentInfo
NextSubmission time.Time
ShortTermCommitments []db.ProjectCommitmentID // to be excluded from mail notifications.
}

var (
findExpiringCommitments = sqlext.SimplifyWhitespace(`
findExpiringCommitmentsQuery = sqlext.SimplifyWhitespace(`
SELECT ps.project_id, ps.type, pr.name, par.az, pc.id, pc.creator_name, pc.amount, pc.duration, pc.expires_at
FROM project_services ps
JOIN project_resources pr ON pr.service_id = ps.id
JOIN project_az_resources par ON par.resource_id = pr.id
JOIN project_commitments pc ON pc.az_resource_id = par.id
WHERE pc.expires_at <= $1
WHERE pc.expires_at <= $1 AND NOT pc.notified_for_expiration
ORDER BY ps.type, pr.name, par.az ASC, pc.amount DESC;
`)
updateCommitmentAsNotified = sqlext.SimplifyWhitespace(`
updateCommitmentAsNotifiedQuery = sqlext.SimplifyWhitespace(`
UPDATE project_commitments SET notified_for_expiration = true WHERE id = $1;
`)
)
Expand All @@ -83,7 +84,7 @@ func (c *Collector) discoverExpiringCommitments(_ context.Context, _ prometheus.
}

var shortTermCommitments []db.ProjectCommitmentID
err := sqlext.ForeachRow(c.DB, findExpiringCommitments, []any{cutoff}, func(rows *sql.Rows) error {
err := sqlext.ForeachRow(c.DB, findExpiringCommitmentsQuery, []any{cutoff}, func(rows *sql.Rows) error {
var pid db.ProjectID
var info datamodel.CommitmentInfo
err := rows.Scan(
Expand All @@ -106,22 +107,7 @@ func (c *Collector) discoverExpiringCommitments(_ context.Context, _ prometheus.
return ExpiringCommitments{}, err
}

// mark short-term commitments as notified without queueing them.
tx, err := c.DB.Begin()
if err != nil {
return ExpiringCommitments{}, err
}
for _, shortTerm := range shortTermCommitments {
_, err = tx.Exec(updateCommitmentAsNotified, shortTerm)
if err != nil {
return ExpiringCommitments{}, err
}
}
err = tx.Commit()

if err != nil {
return ExpiringCommitments{}, err
}
commitments.ShortTermCommitments = shortTermCommitments

return commitments, nil
}
Expand All @@ -131,11 +117,19 @@ func (c *Collector) processExpiringCommitmentTask(ctx context.Context, task Expi
if err != nil {
return err
}
defer sqlext.RollbackUnlessCommitted(tx)

// mark short-term commitments as notified without queueing them.
for _, shortTermCommitment := range task.ShortTermCommitments {
_, err = tx.Exec(updateCommitmentAsNotifiedQuery, shortTermCommitment)
if err != nil {
return err
}
}

// sort notifications per project_id in order to have consistent unit tests
projectIDs := slices.Collect(maps.Keys(task.Notifications))
sort.Slice(projectIDs, func(i, j int) bool { return projectIDs[i] < projectIDs[j] })

for _, projectID := range projectIDs {
var mailInfo datamodel.MailInfo
commitments := task.Notifications[projectID]
Expand All @@ -155,7 +149,7 @@ func (c *Collector) processExpiringCommitmentTask(ctx context.Context, task Expi
}

for _, c := range commitments {
_, err = tx.Exec(updateCommitmentAsNotified, c.Commitment.ID)
_, err = tx.Exec(updateCommitmentAsNotifiedQuery, c.Commitment.ID)
if err != nil {
return err
}
Expand Down
22 changes: 17 additions & 5 deletions internal/collector/expiring_commitments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package collector
import (
"testing"
"text/template"
"time"

"github.com/sapcc/go-bits/easypg"

Expand Down Expand Up @@ -68,17 +67,30 @@ func Test_ExpiringCommitmentNotification(t *testing.T) {
UPDATE project_commitments SET notified_for_expiration = TRUE WHERE id = 9 AND transfer_token = NULL;
INSERT INTO project_mail_notifications (id, project_id, subject, body, next_submission_at) VALUES (1, 1, 'Information about expiring commitments', 'Domain:germany Project:berlin Creator:dummy Amount:5 Duration:1 year Date:1970-01-01 Service:first Resource:things AZ:az-one Creator:dummy Amount:10 Duration:1 year Date:1970-01-01 Service:first Resource:things AZ:az-two', %[1]d);
INSERT INTO project_mail_notifications (id, project_id, subject, body, next_submission_at) VALUES (2, 2, 'Information about expiring commitments', 'Domain:germany Project:dresden Creator:dummy Amount:5 Duration:1 year Date:1970-01-27 Service:first Resource:things AZ:az-one Creator:dummy Amount:10 Duration:1 year Date:1970-01-27 Service:first Resource:things AZ:az-two', %[1]d);
`, c.MeasureTime().Add(3*time.Minute).Unix())
`, c.MeasureTime().Add(nextSumbissionInteval).Unix())

// mail queue with an empty template should fail
s.Cluster.MailTemplates = core.MailTemplates{ConfirmedCommitments: template.New("")}
err := (job.ProcessOne(s.Ctx))
mailTemplates := s.Cluster.MailTemplates
s.Cluster.MailTemplates = core.MailTemplates{ExpiringCommitments: template.New("")}
// commitments that are already sent out for a notification are not visible in the result set anymore - a new one gets created.
_, err := s.DB.Exec("INSERT INTO project_commitments (id, az_resource_id, amount, created_at, creator_uuid, creator_name, duration, expires_at, state) VALUES (99, 1, 10, UNIX(0), 'dummy', 'dummy', '1 year', UNIX(0), 'expired');")
tr.DBChanges().Ignore()
mustT(t, err)
err = (job.ProcessOne(s.Ctx))
if err == nil {
t.Fatal("execution without mail template must fail")
}
s.Cluster.MailTemplates = core.MailTemplates{ConfirmedCommitments: nil}
s.Cluster.MailTemplates = core.MailTemplates{ExpiringCommitments: nil}
err = (job.ProcessOne(s.Ctx))
if err == nil {
t.Fatal("execution without mail template must fail")
}

// create a notification for the created commitment. Do not send another notification for commitments that are already marked as notified.
s.Cluster.MailTemplates = mailTemplates
mustT(t, job.ProcessOne(s.Ctx))
tr.DBChanges().AssertEqualf(`
UPDATE project_commitments SET notified_for_expiration = TRUE WHERE id = 99 AND transfer_token = NULL;
INSERT INTO project_mail_notifications (id, project_id, subject, body, next_submission_at) VALUES (3, 1, 'Information about expiring commitments', 'Domain:germany Project:berlin Creator:dummy Amount:10 Duration:1 year Date:1970-01-01 Service:first Resource:things AZ:az-one', %d);
`, c.MeasureTime().Add(nextSumbissionInteval).Unix())
}

0 comments on commit 59d1d80

Please sign in to comment.