Skip to content

Commit

Permalink
feat(notifier): refactor reschedule func
Browse files Browse the repository at this point in the history
  • Loading branch information
kissken committed Nov 8, 2023
1 parent 2e1545d commit 6150f5c
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 22 deletions.
12 changes: 12 additions & 0 deletions metrics/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,15 @@ func (metrics *NotifierMetrics) MarkSendersOkMetrics(contactType string) {
metric.Mark(1)
}
}

// MarkSendersFailedMetrics marks metrics as 1 by contactType when notifications were unsuccessfully sent.
func (metrics *NotifierMetrics) MarkSendersFailedMetrics(contactType string) {
if metric, found := metrics.SendersFailedMetrics.GetRegisteredMeter(contactType); found {
metric.Mark(1)
}
}

// MarkSendingFailed marks metrics when notifications were unsuccessfully sent.
func (metrics *NotifierMetrics) MarkSendingFailed() {
metrics.SendingFailed.Mark(1)
}
51 changes: 29 additions & 22 deletions notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func NewNotifier(database moira.Database, logger moira.Logger, config Config, me
func (notifier *StandardNotifier) Send(pkg *NotificationPackage, waitGroup *sync.WaitGroup) {
ch, found := notifier.senders[pkg.Contact.Type]
if !found {
notifier.resend(pkg, fmt.Sprintf("Unknown contact type '%s' [%s]", pkg.Contact.Type, pkg))
notifier.reschedule(pkg, fmt.Sprintf("Unknown contact type '%s' [%s]", pkg.Contact.Type, pkg))
return
}
waitGroup.Add(1)
Expand All @@ -117,7 +117,7 @@ func (notifier *StandardNotifier) Send(pkg *NotificationPackage, waitGroup *sync
case ch <- *pkg:
break
case <-time.After(notifier.config.SendingTimeout):
notifier.resend(pkg, fmt.Sprintf("Timeout sending %s", pkg))
notifier.reschedule(pkg, fmt.Sprintf("Timeout sending %s", pkg))
break
}
}(pkg)
Expand All @@ -137,36 +137,39 @@ func (notifier *StandardNotifier) GetReadBatchSize() int64 {
return notifier.config.ReadBatchSize
}

func (notifier *StandardNotifier) resend(pkg *NotificationPackage, reason string) {
func (notifier *StandardNotifier) reschedule(pkg *NotificationPackage, reason string) {
if pkg.DontResend {
notifier.metrics.MarkSendersDroppedNotifications(pkg.Contact.Type)
return
}
notifier.metrics.SendingFailed.Mark(1)
if metric, found := notifier.metrics.SendersFailedMetrics.GetRegisteredMeter(pkg.Contact.Type); found {
metric.Mark(1)
}

notifier.metrics.MarkSendingFailed()
notifier.metrics.MarkSendersFailedMetrics(pkg.Contact.Type)

logger := getLogWithPackageContext(&notifier.logger, pkg, &notifier.config)

if notifier.needToStop(pkg.FailCount) {
notifier.metrics.MarkSendersDroppedNotifications(pkg.Contact.Type)
logger.Error().
Msg("Stop resending. Notification interval is timed out")
return
}

logger.Warning().
Int("number_of_retires", pkg.FailCount).
String("reason", reason).
Msg("Can't send message. Retry again in 1 min")

if time.Duration(pkg.FailCount)*time.Minute > notifier.config.ResendingTimeout {
logger.Error().Msg("Stop resending. Notification interval is timed out")
} else {
for _, event := range pkg.Events {
subID := moira.UseString(event.SubscriptionID)
eventLogger := logger.Clone().String(moira.LogFieldNameSubscriptionID, subID)
SetLogLevelByConfig(notifier.config.LogSubscriptionsToLevel, subID, &eventLogger)
notification := notifier.scheduler.ScheduleNotification(time.Now(), event,
pkg.Trigger, pkg.Contact, pkg.Plotting, pkg.Throttled, pkg.FailCount+1, eventLogger)
if err := notifier.database.AddNotification(notification); err != nil {
eventLogger.Error().
Error(err).
Msg("Failed to save scheduled notification")
}
for _, event := range pkg.Events {
subID := moira.UseString(event.SubscriptionID)
eventLogger := logger.Clone().String(moira.LogFieldNameSubscriptionID, subID)
SetLogLevelByConfig(notifier.config.LogSubscriptionsToLevel, subID, &eventLogger)
notification := notifier.scheduler.ScheduleNotification(time.Now(), event,
pkg.Trigger, pkg.Contact, pkg.Plotting, pkg.Throttled, pkg.FailCount+1, eventLogger)
if err := notifier.database.AddNotification(notification); err != nil {
eventLogger.Error().
Error(err).
Msg("Failed to save scheduled notification")
}
}
}
Expand Down Expand Up @@ -230,7 +233,11 @@ func (notifier *StandardNotifier) runSender(sender moira.Sender, ch chan Notific
Msg("Cannot send notification")
}

notifier.resend(&pkg, err.Error())
notifier.reschedule(&pkg, err.Error())
}
}
}

func (notifier *StandardNotifier) needToStop(failCount int) bool {
return time.Duration(failCount)*time.Minute > notifier.config.ResendingTimeout
}

0 comments on commit 6150f5c

Please sign in to comment.