Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(notifier): add ability to scale webhook and script #961

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/notifier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type notifierConfig struct {
MaxFailAttemptToSendAvailable int `yaml:"max_fail_attempt_to_send_available"`
// Specify log level by entities
SetLogLevel setLogLevelConfig `yaml:"set_log_level"`
// Sets the maximum number of sends per sender
MaxParallelSendsPerSender int `yaml:"max_parallel_sends_per_sender"`
kissken marked this conversation as resolved.
Show resolved Hide resolved
}

type selfStateConfig struct {
Expand Down Expand Up @@ -114,6 +116,7 @@ func getDefault() config {
Timezone: "UTC",
ReadBatchSize: int(notifier.NotificationsLimitUnlimited),
MaxFailAttemptToSendAvailable: 3,
MaxParallelSendsPerSender: 16,
},
Telemetry: cmd.TelemetryConfig{
Listen: ":8093",
Expand Down Expand Up @@ -208,6 +211,7 @@ func (config *notifierConfig) getSettings(logger moira.Logger) notifier.Config {
MaxFailAttemptToSendAvailable: config.MaxFailAttemptToSendAvailable,
LogContactsToLevel: contacts,
LogSubscriptionsToLevel: subscriptions,
MaxParallelSendsPerSender: config.MaxParallelSendsPerSender,
}
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/notifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,15 @@ func main() {
notifierConfig := config.Notifier.getSettings(logger)

notifierMetrics := metrics.ConfigureNotifierMetrics(telemetry.Metrics, serviceName)
scheduler := notifier.NewScheduler(database, logger, notifierMetrics)
sender := notifier.NewNotifier(
database,
logger,
notifierConfig,
notifierMetrics,
metricSourceProvider,
imageStoreMap,
scheduler,
)

// Register moira senders
Expand Down
2 changes: 1 addition & 1 deletion database/redis/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (connector *DbConnector) GetTriggerIDsStartWith(prefix string) ([]string, e
return matchedTriggers, nil
}

func (connector *DbConnector) updateTrigger(triggerID string, newTrigger *moira.Trigger, oldTrigger *moira.Trigger) error {
func (connector *DbConnector) updateTrigger(triggerID string, newTrigger *moira.Trigger, oldTrigger *moira.Trigger) error { // nolint:gocyclo
bytes, err := reply.GetTriggerBytes(triggerID, newTrigger)
if err != nil {
return err
Expand Down
15 changes: 9 additions & 6 deletions integration_tests/notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ var location, _ = time.LoadLocation("UTC")
var dateTimeFormat = "15:04 02.01.2006"

var notifierConfig = notifier.Config{
SendingTimeout: time.Millisecond * 10,
ResendingTimeout: time.Hour * 24,
Location: location,
DateTimeFormat: dateTimeFormat,
ReadBatchSize: notifier.NotificationsLimitUnlimited,
SendingTimeout: time.Millisecond * 10,
ResendingTimeout: time.Hour * 24,
Location: location,
DateTimeFormat: dateTimeFormat,
ReadBatchSize: notifier.NotificationsLimitUnlimited,
MaxParallelSendsPerSender: 16,
}

var shutdown = make(chan struct{})
Expand Down Expand Up @@ -87,6 +88,7 @@ func TestNotifier(t *testing.T) {
database.PushNotificationEvent(&event, true) //nolint

metricsSourceProvider := metricSource.CreateMetricSourceProvider(local.Create(database), nil, nil)
scheduler := notifier.NewScheduler(database, logger, notifierMetrics)

notifierInstance := notifier.NewNotifier(
database,
Expand All @@ -95,6 +97,7 @@ func TestNotifier(t *testing.T) {
notifierMetrics,
metricsSourceProvider,
map[string]moira.ImageStore{},
scheduler,
)

sender := mock_moira_alert.NewMockSender(mockCtrl)
Expand All @@ -113,7 +116,7 @@ func TestNotifier(t *testing.T) {
Database: database,
Logger: logger,
Metrics: notifierMetrics,
Scheduler: notifier.NewScheduler(database, logger, notifierMetrics),
Scheduler: scheduler,
}

fetchNotificationsWorker := notifications.FetchNotificationsWorker{
Expand Down
1 change: 1 addition & 0 deletions local/notifier.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ notifier:
front_uri: http://localhost
timezone: UTC
date_time_format: "15:04 02.01.2006"
max_parallel_sends_per_sender: 16
kissken marked this conversation as resolved.
Show resolved Hide resolved
notification_history:
ttl: 48h
query_limit: 10000
Expand Down
1 change: 1 addition & 0 deletions notifier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ type Config struct {
MaxFailAttemptToSendAvailable int
LogContactsToLevel map[string]string
LogSubscriptionsToLevel map[string]string
MaxParallelSendsPerSender int
}
63 changes: 43 additions & 20 deletions notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,38 +73,59 @@ type Notifier interface {

// StandardNotifier represent notification functionality
type StandardNotifier struct {
waitGroup sync.WaitGroup
senders map[string]chan NotificationPackage
logger moira.Logger
database moira.Database
scheduler Scheduler
config Config
metrics *metrics.NotifierMetrics
metricSourceProvider *metricSource.SourceProvider
imageStores map[string]moira.ImageStore
waitGroup sync.WaitGroup
senders map[string]moira.Sender
sendersNotificationsCh map[string]chan NotificationPackage
logger moira.Logger
database moira.Database
scheduler Scheduler
config Config
metrics *metrics.NotifierMetrics
metricSourceProvider *metricSource.SourceProvider
imageStores map[string]moira.ImageStore
sendersNameToType map[string]string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

это для меня пометочка: смотрю зачем он остался...

Copy link
Member

@kissken kissken Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

это тоже мне, если нужно, то зачем такой тип поля, кажется, можно иначе
и перенести ближе к сендерам - по смыслу, чтоб не разносить

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Кажется, по смыслу оно не зависит от каждого конкретного сендера, а нужно именно для нотифаера. Поле маппит имя типа контакта = имени сендера на тип сендера, а если поле с именем не указано, как сейчас для почти всех сендеров, то маппит тип контакта = типу сендера на тип сендера

Таким образом, если хочется несколько сендеров одного типа, то достаточно добавить поле name и смотреть, чтобы оно совпадало с типом контакта

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

вот в этом Issue пользователь как раз написал, что такой вариант нужен и я как раз такой и сделал

sendersOnce map[string]*sync.Once
}

// NewNotifier is initializer for StandardNotifier
func NewNotifier(database moira.Database, logger moira.Logger, config Config, metrics *metrics.NotifierMetrics, metricSourceProvider *metricSource.SourceProvider, imageStoreMap map[string]moira.ImageStore) *StandardNotifier {
func NewNotifier(
database moira.Database,
logger moira.Logger,
config Config,
metrics *metrics.NotifierMetrics,
metricSourceProvider *metricSource.SourceProvider,
imageStoreMap map[string]moira.ImageStore,
scheduler Scheduler,
) *StandardNotifier {
return &StandardNotifier{
senders: make(map[string]chan NotificationPackage),
logger: logger,
database: database,
scheduler: NewScheduler(database, logger, metrics),
config: config,
metrics: metrics,
metricSourceProvider: metricSourceProvider,
imageStores: imageStoreMap,
senders: make(map[string]moira.Sender),
sendersNotificationsCh: make(map[string]chan NotificationPackage),
logger: logger,
database: database,
scheduler: scheduler,
config: config,
metrics: metrics,
metricSourceProvider: metricSourceProvider,
imageStores: imageStoreMap,
sendersNameToType: make(map[string]string),
sendersOnce: make(map[string]*sync.Once),
kissken marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Send is realization of StandardNotifier Send functionality
func (notifier *StandardNotifier) Send(pkg *NotificationPackage, waitGroup *sync.WaitGroup) {
ch, found := notifier.senders[pkg.Contact.Type]
if !found {
senderType, ok := notifier.sendersNameToType[pkg.Contact.Type]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pkg.Contact.Type - это на самом деле уникальный идентификатор?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

не поняла, как мы ловко определим тут, что это нужная нам почта/вебхук/и тд

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pkg.Contact.Type - у всех же будет лежать как лежал старое значение

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Да, будет лежать старое значение и все будет норм работать, тк будет матчиться тип сендера на тип контакта, а если пользователи захотят добавить несколько сендеров одного типа, то необходимо различать по полю name, которое должно будет совпадать с типом контакта

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

кажется, что-то непрозрачное
у меня была одна почта
потом появилась вторая

я первую назову: Старая почта, а новую - Новая почта

if !ok {
notifier.resend(pkg, fmt.Sprintf("Unknown contact type '%s' [%s]", pkg.Contact.Type, pkg))
return
}

ch, found := notifier.sendersNotificationsCh[senderType]
if !found {
notifier.resend(pkg, fmt.Sprintf("failed to get notification channel from '%s' [%s]", senderType, pkg))
return
}

waitGroup.Add(1)
go func(pkg *NotificationPackage) {
defer waitGroup.Done()
Expand Down Expand Up @@ -142,6 +163,7 @@ func (notifier *StandardNotifier) resend(pkg *NotificationPackage, reason string
notifier.metrics.MarkSendersDroppedNotifications(pkg.Contact.Type)
return
}

notifier.metrics.SendingFailed.Mark(1)
if metric, found := notifier.metrics.SendersFailedMetrics.GetRegisteredMeter(pkg.Contact.Type); found {
metric.Mark(1)
Expand Down Expand Up @@ -212,6 +234,7 @@ func (notifier *StandardNotifier) runSender(sender moira.Sender, ch chan Notific
notifier.metrics.MarkSendersOkMetrics(pkg.Contact.Type)
continue
}

switch e := err.(type) {
case moira.SenderBrokenContactError:
log.Warning().
Expand Down
74 changes: 40 additions & 34 deletions notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@ import (
)

var (
plots [][]byte
shutdown = make(chan struct{})
plots [][]byte
shutdown = make(chan struct{})
location, _ = time.LoadLocation("UTC")
dateTimeFormat = "15:04 02.01.2006"
defaultConfig = Config{
SendingTimeout: time.Millisecond * 10,
ResendingTimeout: time.Hour * 24,
Location: location,
DateTimeFormat: dateTimeFormat,
MaxParallelSendsPerSender: 16,
}
)

var (
mockCtrl *gomock.Controller
sender *mock_moira_alert.MockSender
notif *StandardNotifier
scheduler *mock_scheduler.MockScheduler
dataBase *mock_moira_alert.MockDatabase
logger moira.Logger
mockCtrl *gomock.Controller
sender *mock_moira_alert.MockSender
standardNotifier *StandardNotifier
scheduler *mock_scheduler.MockScheduler
dataBase *mock_moira_alert.MockDatabase
logger moira.Logger
)

func TestGetMetricNames(t *testing.T) {
Expand All @@ -39,6 +48,7 @@ func TestGetMetricNames(t *testing.T) {
actual := notificationsPackage.GetMetricNames()
So(actual, ShouldResemble, expected)
})

Convey("Test package with no trigger events", func() {
pkg := NotificationPackage{}
for _, event := range notificationsPackage.Events {
Expand All @@ -52,6 +62,7 @@ func TestGetMetricNames(t *testing.T) {
So(actual, ShouldResemble, expected)
})
})

Convey("Test empty notification package", t, func() {
emptyNotificationPackage := NotificationPackage{}
actual := emptyNotificationPackage.GetMetricNames()
Expand All @@ -66,6 +77,7 @@ func TestGetWindow(t *testing.T) {
So(from, ShouldEqual, 11)
So(to, ShouldEqual, 179)
})

Convey("Test empty notification package", t, func() {
emptyNotificationPackage := NotificationPackage{}
_, _, err := emptyNotificationPackage.GetWindow()
Expand All @@ -74,7 +86,7 @@ func TestGetWindow(t *testing.T) {
}

func TestUnknownContactType(t *testing.T) {
configureNotifier(t)
configureNotifier(t, defaultConfig)
defer afterTest()

var eventsData moira.NotificationEvents = []moira.NotificationEvent{event}
Expand All @@ -90,12 +102,12 @@ func TestUnknownContactType(t *testing.T) {
dataBase.EXPECT().AddNotification(&notification).Return(nil)

var wg sync.WaitGroup
notif.Send(&pkg, &wg)
standardNotifier.Send(&pkg, &wg)
wg.Wait()
}

func TestFailSendEvent(t *testing.T) {
configureNotifier(t)
configureNotifier(t, defaultConfig)
defer afterTest()

var eventsData moira.NotificationEvents = []moira.NotificationEvent{event}
Expand All @@ -106,13 +118,14 @@ func TestFailSendEvent(t *testing.T) {
Type: "test",
},
}

notification := moira.ScheduledNotification{}
sender.EXPECT().SendEvents(eventsData, pkg.Contact, pkg.Trigger, plots, pkg.Throttled).Return(fmt.Errorf("Cant't send"))
scheduler.EXPECT().ScheduleNotification(gomock.Any(), event, pkg.Trigger, pkg.Contact, pkg.Plotting, pkg.Throttled, pkg.FailCount+1, gomock.Any()).Return(&notification)
dataBase.EXPECT().AddNotification(&notification).Return(nil)

var wg sync.WaitGroup
notif.Send(&pkg, &wg)
standardNotifier.Send(&pkg, &wg)
wg.Wait()
time.Sleep(time.Second * 2)
}
Expand All @@ -122,7 +135,7 @@ func TestNoResendForSendToBrokenContact(t *testing.T) {
t.Skip("skipping test in short mode.")
}

configureNotifier(t)
configureNotifier(t, defaultConfig)
defer afterTest()

var eventsData moira.NotificationEvents = []moira.NotificationEvent{event}
Expand All @@ -137,13 +150,13 @@ func TestNoResendForSendToBrokenContact(t *testing.T) {
Return(moira.NewSenderBrokenContactError(fmt.Errorf("some sender reason")))

var wg sync.WaitGroup
notif.Send(&pkg, &wg)
standardNotifier.Send(&pkg, &wg)
wg.Wait()
time.Sleep(time.Second * 2)
}

func TestTimeout(t *testing.T) {
configureNotifier(t)
configureNotifier(t, defaultConfig)
var wg sync.WaitGroup
defer afterTest()

Expand All @@ -160,10 +173,10 @@ func TestTimeout(t *testing.T) {
sender.EXPECT().SendEvents(eventsData, pkg.Contact, pkg.Trigger, plots, pkg.Throttled).Return(nil).Do(func(arg0, arg1, arg2, arg3, arg4 interface{}) {
fmt.Print("Trying to send for 10 second")
time.Sleep(time.Second * 10)
}).Times(maxParallelSendsPerSender)
}).Times(standardNotifier.GetMaxParallelSendsPerSender())

for i := 0; i < maxParallelSendsPerSender; i++ {
notif.Send(&pkg, &wg)
for i := 0; i < standardNotifier.GetMaxParallelSendsPerSender(); i++ {
standardNotifier.Send(&pkg, &wg)
wg.Wait()
}

Expand All @@ -180,7 +193,7 @@ func TestTimeout(t *testing.T) {
scheduler.EXPECT().ScheduleNotification(gomock.Any(), event, pkg2.Trigger, pkg2.Contact, pkg.Plotting, pkg2.Throttled, pkg2.FailCount+1, gomock.Any()).Return(&notification)
dataBase.EXPECT().AddNotification(&notification).Return(nil).Do(func(f ...interface{}) { close(shutdown) })

notif.Send(&pkg2, &wg)
standardNotifier.Send(&pkg2, &wg)
wg.Wait()
waitTestEnd()
}
Expand All @@ -195,16 +208,8 @@ func waitTestEnd() {
}
}

func configureNotifier(t *testing.T) {
func configureNotifier(t *testing.T, config Config) {
notifierMetrics := metrics.ConfigureNotifierMetrics(metrics.NewDummyRegistry(), "notifier")
var location, _ = time.LoadLocation("UTC")
dateTimeFormat := "15:04 02.01.2006"
config := Config{
SendingTimeout: time.Millisecond * 10,
ResendingTimeout: time.Hour * 24,
Location: location,
DateTimeFormat: dateTimeFormat,
}

mockCtrl = gomock.NewController(t)
dataBase = mock_moira_alert.NewMockDatabase(mockCtrl)
Expand All @@ -213,24 +218,25 @@ func configureNotifier(t *testing.T) {
sender = mock_moira_alert.NewMockSender(mockCtrl)
metricsSourceProvider := metricSource.CreateMetricSourceProvider(local.Create(dataBase), nil, nil)

notif = NewNotifier(dataBase, logger, config, notifierMetrics, metricsSourceProvider, map[string]moira.ImageStore{})
notif.scheduler = scheduler
standardNotifier = NewNotifier(dataBase, logger, config, notifierMetrics, metricsSourceProvider, map[string]moira.ImageStore{}, scheduler)
senderSettings := map[string]interface{}{
"type": "test",
}

sender.EXPECT().Init(senderSettings, logger, location, "15:04 02.01.2006").Return(nil)
sender.EXPECT().Init(senderSettings, logger, location, dateTimeFormat).Return(nil)

notif.RegisterSender(senderSettings, sender) //nolint
err := standardNotifier.RegisterSender(senderSettings, sender)

Convey("Should return one sender", t, func() {
So(notif.GetSenders(), ShouldResemble, map[string]bool{"test": true})
So(err, ShouldBeNil)
So(standardNotifier.GetSenders(), ShouldResemble, map[string]bool{"test": true})
So(standardNotifier.sendersNameToType["test"], ShouldEqual, senderSettings["type"])
})
}

func afterTest() {
mockCtrl.Finish()
notif.StopSenders()
standardNotifier.StopSenders()
}

var subID = "SubscriptionID-000000000000001"
Expand Down
Loading
Loading