Skip to content

Commit

Permalink
Merge pull request #297 from allegro/consuldatacenters
Browse files Browse the repository at this point in the history
don't search in all DCs if configured
  • Loading branch information
ojagodzinski authored Nov 12, 2019
2 parents 3d0bb6d + 833e9b7 commit 775145b
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .goxc.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"deb-source"
],
"BuildConstraints": "linux,!arm darwin",
"PackageVersion": "1.5.1",
"PackageVersion": "1.5.2",
"TaskSettings": {
"bintray": {
"downloadspage": "bintray.md",
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ FPM-exists:
@fpm -v || \
(echo >&2 "FPM must be installed on the system. See https://github.com/jordansissel/fpm"; false)

deb: FPM-exists build
deb: FPM-exists build-linux
mkdir -p dist/$(VERSION)/
cd dist/$(VERSION)/ && \
fpm -s dir \
Expand All @@ -73,6 +73,7 @@ deb: FPM-exists build
-v $(VERSION) \
--url="https://github.com/allegro/marathon-consul" \
--vendor=Allegro \
--architecture=amd64 \
--maintainer="Allegro Group <[email protected]>" \
--description "Marathon-consul service (performs Marathon Tasks registration as Consul Services for service discovery) Marathon-consul takes information provided by the Marathon event bus and forwards it to Consul agents. It also re-syncs all the information from Marathon to Consul on startup and repeats it with given interval." \
--deb-priority optional \
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (config *Config) parseFlags() {
flag.StringVar(&config.Consul.IgnoredHealthChecks, "consul-ignored-healthchecks", "", "A comma separated blacklist of Marathon health check types that will not be migrated to Consul, e.g. command,tcp")
flag.BoolVar(&config.Consul.EnableTagOverride, "consul-enable-tag-override", false, "Disable the anti-entropy feature for all services")
flag.StringVar(&config.Consul.LocalAgentHost, "consul-local-agent-host", "", "Consul Agent hostname or IP that should be used for startup sync")
flag.StringVar(&config.Consul.Dc, "consul-dc", "", "Consul DC where to look for services, all if empty")

// Web
flag.StringVar(&config.Web.Listen, "listen", ":4000", "Accept connections at this address")
Expand Down
1 change: 1 addition & 0 deletions consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Config struct {
SslCaCert string
Token string
Tag string
Dc string
Timeout time.Interval
RequestRetries uint32
AgentFailuresTolerance uint32
Expand Down
22 changes: 15 additions & 7 deletions consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *Consul) getServicesUsingProviderWithRetriesOnAgentFailure(provide Servi
}

func (c *Consul) getServicesUsingAgent(name string, agent *consulAPI.Client) ([]*service.Service, error) {
dcAwareQueries, err := dcAwareQueriesForAllDCs(agent)
dcAwareQueries, err := dcAwareQueries(agent, c.config.Dc)
if err != nil {
return nil, err
}
Expand All @@ -80,12 +80,18 @@ func (c *Consul) getServicesUsingAgent(name string, agent *consulAPI.Client) ([]
return allServices, nil
}

func dcAwareQueriesForAllDCs(agent *consulAPI.Client) ([]*consulAPI.QueryOptions, error) {
func dcAwareQueries(agent *consulAPI.Client, singleDc string) ([]*consulAPI.QueryOptions, error) {
if singleDc != "" {
var queries []*consulAPI.QueryOptions
queries = append(queries, &consulAPI.QueryOptions{
Datacenter: singleDc,
})
return queries, nil
}
datacenters, err := agent.Catalog().Datacenters()
if err != nil {
return nil, err
}

var queries []*consulAPI.QueryOptions
for _, dc := range datacenters {
queries = append(queries, &consulAPI.QueryOptions{
Expand All @@ -101,7 +107,7 @@ func (c *Consul) GetAllServices() ([]*service.Service, error) {
}

func (c *Consul) getAllServices(agent *consulAPI.Client) ([]*service.Service, error) {
dcAwareQueries, err := dcAwareQueriesForAllDCs(agent)
dcAwareQueries, err := dcAwareQueries(agent, c.config.Dc)
if err != nil {
return nil, err
}
Expand All @@ -110,7 +116,8 @@ func (c *Consul) getAllServices(agent *consulAPI.Client) ([]*service.Service, er
for _, dcAwareQuery := range dcAwareQueries {
consulServices, _, err := agent.Catalog().Services(dcAwareQuery)
if err != nil {
return nil, err
log.WithError(err).Error("An error occurred getting services from Consul, will continue with another DC")
continue
}
for consulService, tags := range consulServices {
if contains(tags, c.config.Tag) {
Expand Down Expand Up @@ -231,7 +238,7 @@ func (c *Consul) deregisterMultipleServices(services []*service.Service, taskID

func (c *Consul) findServicesByTaskID(searchedTaskID apps.TaskID) ([]*service.Service, error) {
return c.getServicesUsingProviderWithRetriesOnAgentFailure(func(agent *consulAPI.Client) ([]*service.Service, error) {
dcAwareQueries, err := dcAwareQueriesForAllDCs(agent)
dcAwareQueries, err := dcAwareQueries(agent, c.config.Dc)
if err != nil {
return nil, err
}
Expand All @@ -241,7 +248,8 @@ func (c *Consul) findServicesByTaskID(searchedTaskID apps.TaskID) ([]*service.Se
for _, dcAwareQuery := range dcAwareQueries {
consulServices, _, err := agent.Catalog().Services(dcAwareQuery)
if err != nil {
return nil, err
log.WithError(err).Error("An error occurred getting services from Consul, will continue with another DC")
continue
}
for consulService, tags := range consulServices {
if contains(tags, searchedTag) {
Expand Down
44 changes: 44 additions & 0 deletions consul/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,50 @@ func TestGetAllServices(t *testing.T) {
assert.Contains(t, serviceNames, "serviceB")
}

func TestGetServicesFromSingleDc(t *testing.T) {
t.Parallel()
// create cluster of 2 consul servers
server1 := CreateTestServerDatacenter(t, "dc-1")
defer server1.Stop()

server2 := CreateTestServerDatacenter(t, "dc-2")
defer server2.Stop()

server1.JoinWAN(t, server2.LANAddr)

// create client
consul := ClientAtServer(server1)
consul.config.Tag = "marathon"
// configure fetching services from single datacenter
consul.config.Dc = "dc-1"

// given
// register services in both servers
server1.AddService(t, "serviceA", "passing", []string{"public", "marathon"})
server1.AddService(t, "serviceB", "passing", []string{"marathon"})
server1.AddService(t, "serviceC", "passing", []string{"zookeeper"})

server2.AddService(t, "serviceA", "passing", []string{"private", "marathon"})
server2.AddService(t, "serviceB", "passing", []string{"zookeeper"})
server2.AddService(t, "serviceD", "passing", []string{"marathon"})

// when
services, err := consul.GetAllServices()

// then
assert.NoError(t, err)
assert.Len(t, services, 2)

serviceNames := make(map[string]struct{})
for _, s := range services {
serviceNames[s.Name] = struct{}{}
}
assert.Len(t, serviceNames, 2)
assert.Contains(t, serviceNames, "serviceA")
assert.Contains(t, serviceNames, "serviceB")
assert.NotContains(t, serviceNames, "serviceD")
}

func TestGetServicesUsingProviderWithRetriesOnAgentFailure_ShouldRetryConfiguredNumberOfTimes(t *testing.T) {
t.Parallel()
server1 := CreateTestServer(t)
Expand Down
11 changes: 11 additions & 0 deletions consul/consul_test_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ func CreateTestServer(t *testing.T) *testutil.TestServer {
return server
}

func CreateTestServerDatacenter(t *testing.T, dc string) *testutil.TestServer {
server, err := testutil.NewTestServerConfig(func(c *testutil.TestServerConfig) {
c.Datacenter = dc
c.Ports = testPortConfig(t)
})

assert.NoError(t, err)

return server
}

const MasterToken = "masterToken"

func CreateSecuredTestServer(t *testing.T) *testutil.TestServer {
Expand Down

0 comments on commit 775145b

Please sign in to comment.