Skip to content

Commit

Permalink
Adding association between a Kafka cluster and a notifier
Browse files Browse the repository at this point in the history
Signed-off-by: muicoder <[email protected]>
linkedin#611
  • Loading branch information
muicoder committed Jul 6, 2023
1 parent e593345 commit 3123b2a
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 23 deletions.
1 change: 1 addition & 0 deletions config/burrow.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ min-distance=1

[notifier.default]
class-name="http"
cluster="local"
url-open="http://someservice.example.com:1467/v1/event"
interval=60
timeout=5
Expand Down
6 changes: 6 additions & 0 deletions core/internal/helpers/coordinators.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func (m *MockModule) GetName() string {
return args.String(0)
}

// GetCluster mocks the notifier.Module GetCluster func
func (m *MockModule) GetCluster() string {
args := m.Called()
return args.String(0)
}

// GetGroupAllowlist mocks the notifier.Module GetGroupAllowlist func
func (m *MockModule) GetGroupAllowlist() *regexp.Regexp {
args := m.Called()
Expand Down
2 changes: 2 additions & 0 deletions core/internal/httpserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func (hc *Coordinator) configNotifierHTTP(w http.ResponseWriter, r *http.Request
SendClose: viper.GetBool(configRoot + ".send-close"),
ExtraCa: viper.GetString(configRoot + ".extra-ca"),
NoVerify: viper.GetString(configRoot + ".noverify"),
Cluster: viper.GetString(configRoot + ".cluster"),
},
Request: requestInfo,
})
Expand Down Expand Up @@ -265,6 +266,7 @@ func (hc *Coordinator) configNotifierEmail(w http.ResponseWriter, r *http.Reques
To: viper.GetString(configRoot + ".to"),
ExtraCa: viper.GetString(configRoot + ".extra-ca"),
NoVerify: viper.GetString(configRoot + ".noverify"),
Cluster: viper.GetString(configRoot + ".cluster"),
},
Request: requestInfo,
})
Expand Down
2 changes: 2 additions & 0 deletions core/internal/httpserver/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ type httpResponseConfigModuleNotifierHTTP struct {
SendClose bool `json:"send-close"`
ExtraCa string `json:"extra-ca"`
NoVerify string `json:"noverify"`
Cluster string `json:"cluster"`
}

type httpResponseConfigModuleNotifierSlack struct {
Expand Down Expand Up @@ -238,6 +239,7 @@ type httpResponseConfigModuleNotifierEmail struct {
To string `json:"to"`
ExtraCa string `json:"extra-ca"`
NoVerify string `json:"noverify"`
Cluster string `json:"cluster"`
}

type httpResponseConfigModuleNotifierNull struct {
Expand Down
13 changes: 11 additions & 2 deletions core/internal/notifier/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
type Module interface {
protocol.Module
GetName() string
GetCluster() string
GetGroupAllowlist() *regexp.Regexp
GetGroupDenylist() *regexp.Regexp
GetLogger() *zap.Logger
Expand Down Expand Up @@ -95,7 +96,7 @@ type Coordinator struct {

// getModuleForClass returns the correct module based on the passed className. As part of the Configure steps, if there
// is any error, it will panic with an appropriate message describing the problem.
func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template) protocol.Module {
func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template, cluster string) protocol.Module {
logger := app.Logger.With(
zap.String("type", "module"),
zap.String("coordinator", "notifier"),
Expand All @@ -113,6 +114,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
case "email":
return &EmailNotifier{
Expand All @@ -123,6 +125,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
case "null":
return &NullNotifier{
Expand All @@ -133,6 +136,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
default:
panic("Unknown notifier className provided: " + className)
Expand Down Expand Up @@ -194,6 +198,8 @@ func (nc *Coordinator) Configure() {
groupAllowlist = re
}

cluster := viper.GetString(configRoot + ".cluster")

// Compile the denylist for the consumer groups to not notify for
var groupDenylist *regexp.Regexp
denylist := viper.GetString(configRoot + ".group-denylist")
Expand Down Expand Up @@ -227,7 +233,7 @@ func (nc *Coordinator) Configure() {
templateClose = tmpl.Templates()[0]
}

module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose)
module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose, cluster)
module.Configure(name, configRoot)
nc.modules[name] = module
interval := viper.GetInt64(configRoot + ".interval")
Expand Down Expand Up @@ -436,6 +442,9 @@ func (nc *Coordinator) checkAndSendResponseToModules(response *protocol.Consumer
for _, genericModule := range nc.modules {
module := genericModule.(Module)

if module.GetCluster() != "" && response.Cluster != module.GetCluster() {
continue
}
// No allowlist means everything passes
groupAllowlist := module.GetGroupAllowlist()
groupDenylist := module.GetGroupDenylist()
Expand Down
52 changes: 31 additions & 21 deletions core/internal/notifier/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,34 +472,39 @@ var notifyModuleTests = []struct {
ExpectClose bool
ExpectID bool
SendOnce bool
Cluster string
}{
// {1, 0, false, false, false, false, false},
// {2, 0, false, false, false, false, false},
// {1, 0, true, false, false, false, false},
// {1, 0, false, true, false, false, false},
// {1, 0, true, true, false, false, false},

{1, 1, false, false, true, false, false, false},
{1, 1, false, true, true, false, false, false},
{1, 1, true, false, true, false, false, false},
{1, 1, true, true, true, true, false, true},

{1, 2, false, false, true, false, true, false},
{1, 2, false, true, true, false, true, false},
{1, 2, true, false, true, false, true, false},
{1, 2, true, true, true, false, true, false},
{1, 2, true, true, false, false, true, true},
{1, 2, false, true, true, false, true, true},

{3, 2, false, false, false, false, true, false},
{3, 2, false, true, false, false, true, false},
{3, 2, true, false, false, false, true, false},
{3, 2, true, true, false, false, true, false},

{2, 1, false, false, false, false, false, false},
{2, 1, false, true, false, false, false, false},
{2, 1, true, false, false, false, false, false},
{2, 1, true, true, true, true, false, false},
{1, 1, false, false, true, false, false, false, ""},
{1, 1, false, true, true, false, false, false, "testcluster"},
{1, 1, true, false, true, false, false, false, "unmatchedCluster"},
{1, 1, true, true, true, true, false, true, ""},

{1, 2, false, false, true, false, true, false, ""},
{1, 2, false, true, true, false, true, false, ""},
{1, 2, true, false, true, false, true, false, ""},
{1, 2, true, true, true, false, true, false, ""},
{1, 2, true, true, false, false, true, true, ""},
{1, 2, false, true, true, false, true, true, ""},

{3, 2, false, false, false, false, true, false, ""},
{3, 2, false, true, false, false, true, false, ""},
{3, 2, true, false, false, false, true, false, ""},
{3, 2, true, true, false, false, true, false, ""},

{2, 1, false, false, false, false, false, false, ""},
{2, 1, false, true, false, false, false, false, ""},
{2, 1, true, false, false, false, false, false, ""},
{2, 1, true, true, true, true, false, false, ""},
}

func checkNotifierClusterMatch(cluster string) bool {
return cluster == "" || cluster == "testcluster"
}

func TestCoordinator_checkAndSendResponseToModules(t *testing.T) {
Expand Down Expand Up @@ -558,10 +563,15 @@ func TestCoordinator_checkAndSendResponseToModules(t *testing.T) {
// Set up the mock module and expected calls
mockModule := &helpers.MockModule{}
coordinator.modules["test"] = mockModule
mockModule.On("GetCluster").Return(testSet.Cluster)

if checkNotifierClusterMatch(testSet.Cluster) {
mockModule.On("GetName").Return("test")
mockModule.On("GetGroupAllowlist").Return((*regexp.Regexp)(nil))
mockModule.On("GetGroupDenylist").Return((*regexp.Regexp)(nil))
mockModule.On("AcceptConsumerGroup", response).Return(true)
}

if testSet.ExpectSend {
mockModule.On("Notify", response, mock.MatchedBy(func(s string) bool { return true }), mock.MatchedBy(func(t time.Time) bool { return true }), testSet.ExpectClose).Return()
}
Expand Down
6 changes: 6 additions & 0 deletions core/internal/notifier/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type EmailNotifier struct {
Log *zap.Logger

name string
cluster string
groupAllowlist *regexp.Regexp
groupDenylist *regexp.Regexp
extras map[string]string
Expand Down Expand Up @@ -139,6 +140,11 @@ func (module *EmailNotifier) GetName() string {
return module.name
}

// GetCluster returns the configured name of this module
func (module *EmailNotifier) GetCluster() string {
return module.cluster
}

// GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one)
func (module *EmailNotifier) GetGroupAllowlist() *regexp.Regexp {
return module.groupAllowlist
Expand Down
5 changes: 5 additions & 0 deletions core/internal/notifier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type HTTPNotifier struct {
Log *zap.Logger

name string
cluster string
groupAllowlist *regexp.Regexp
groupDenylist *regexp.Regexp
extras map[string]string
Expand Down Expand Up @@ -124,6 +125,10 @@ func (module *HTTPNotifier) GetName() string {
return module.name
}

func (module *HTTPNotifier) GetCluster() string {
return module.cluster
}

// GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one)
func (module *HTTPNotifier) GetGroupAllowlist() *regexp.Regexp {
return module.groupAllowlist
Expand Down
6 changes: 6 additions & 0 deletions core/internal/notifier/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type NullNotifier struct {
Log *zap.Logger

name string
cluster string
groupAllowlist *regexp.Regexp
groupDenylist *regexp.Regexp
extras map[string]string
Expand Down Expand Up @@ -75,6 +76,11 @@ func (module *NullNotifier) GetName() string {
return module.name
}

// GetCluster returns the configured name of this module
func (module *NullNotifier) GetCluster() string {
return module.cluster
}

// GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one)
func (module *NullNotifier) GetGroupAllowlist() *regexp.Regexp {
return module.groupAllowlist
Expand Down

0 comments on commit 3123b2a

Please sign in to comment.