Skip to content

Commit

Permalink
ignore if datacenter is in failed state
Browse files Browse the repository at this point in the history
  • Loading branch information
Oskar Jagodzinski committed Nov 12, 2019
1 parent 3316148 commit 833e9b7
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .goxc.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"deb-source"
],
"BuildConstraints": "linux,!arm darwin",
"PackageVersion": "1.5.1",
"PackageVersion": "1.5.2",
"TaskSettings": {
"bintray": {
"downloadspage": "bintray.md",
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ FPM-exists:
@fpm -v || \
(echo >&2 "FPM must be installed on the system. See https://github.com/jordansissel/fpm"; false)

deb: FPM-exists build
deb: FPM-exists build-linux
mkdir -p dist/$(VERSION)/
cd dist/$(VERSION)/ && \
fpm -s dir \
Expand All @@ -73,6 +73,7 @@ deb: FPM-exists build
-v $(VERSION) \
--url="https://github.com/allegro/marathon-consul" \
--vendor=Allegro \
--architecture=amd64 \
--maintainer="Allegro Group <[email protected]>" \
--description "Marathon-consul service (performs Marathon Tasks registration as Consul Services for service discovery) Marathon-consul takes information provided by the Marathon event bus and forwards it to Consul agents. It also re-syncs all the information from Marathon to Consul on startup and repeats it with given interval." \
--deb-priority optional \
Expand Down
18 changes: 10 additions & 8 deletions consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *Consul) getServicesUsingProviderWithRetriesOnAgentFailure(provide Servi
}

func (c *Consul) getServicesUsingAgent(name string, agent *consulAPI.Client) ([]*service.Service, error) {
dcAwareQueries, err := dcAwareQueriesForAllDCs(agent, c.config.Dc)
dcAwareQueries, err := dcAwareQueries(agent, c.config.Dc)
if err != nil {
return nil, err
}
Expand All @@ -80,11 +80,11 @@ func (c *Consul) getServicesUsingAgent(name string, agent *consulAPI.Client) ([]
return allServices, nil
}

func dcAwareQueriesForAllDCs(agent *consulAPI.Client, dc string) ([]*consulAPI.QueryOptions, error) {
if dc != "" {
func dcAwareQueries(agent *consulAPI.Client, singleDc string) ([]*consulAPI.QueryOptions, error) {
if singleDc != "" {
var queries []*consulAPI.QueryOptions
queries = append(queries, &consulAPI.QueryOptions{
Datacenter: dc,
Datacenter: singleDc,
})
return queries, nil
}
Expand All @@ -107,7 +107,7 @@ func (c *Consul) GetAllServices() ([]*service.Service, error) {
}

func (c *Consul) getAllServices(agent *consulAPI.Client) ([]*service.Service, error) {
dcAwareQueries, err := dcAwareQueriesForAllDCs(agent, c.config.Dc)
dcAwareQueries, err := dcAwareQueries(agent, c.config.Dc)
if err != nil {
return nil, err
}
Expand All @@ -116,7 +116,8 @@ func (c *Consul) getAllServices(agent *consulAPI.Client) ([]*service.Service, er
for _, dcAwareQuery := range dcAwareQueries {
consulServices, _, err := agent.Catalog().Services(dcAwareQuery)
if err != nil {
return nil, err
log.WithError(err).Error("An error occurred getting services from Consul, will continue with another DC")
continue
}
for consulService, tags := range consulServices {
if contains(tags, c.config.Tag) {
Expand Down Expand Up @@ -237,7 +238,7 @@ func (c *Consul) deregisterMultipleServices(services []*service.Service, taskID

func (c *Consul) findServicesByTaskID(searchedTaskID apps.TaskID) ([]*service.Service, error) {
return c.getServicesUsingProviderWithRetriesOnAgentFailure(func(agent *consulAPI.Client) ([]*service.Service, error) {
dcAwareQueries, err := dcAwareQueriesForAllDCs(agent, c.config.Dc)
dcAwareQueries, err := dcAwareQueries(agent, c.config.Dc)
if err != nil {
return nil, err
}
Expand All @@ -247,7 +248,8 @@ func (c *Consul) findServicesByTaskID(searchedTaskID apps.TaskID) ([]*service.Se
for _, dcAwareQuery := range dcAwareQueries {
consulServices, _, err := agent.Catalog().Services(dcAwareQuery)
if err != nil {
return nil, err
log.WithError(err).Error("An error occurred getting services from Consul, will continue with another DC")
continue
}
for consulService, tags := range consulServices {
if contains(tags, searchedTag) {
Expand Down
44 changes: 44 additions & 0 deletions consul/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,50 @@ func TestGetAllServices(t *testing.T) {
assert.Contains(t, serviceNames, "serviceB")
}

func TestGetServicesFromSingleDc(t *testing.T) {
t.Parallel()
// create cluster of 2 consul servers
server1 := CreateTestServerDatacenter(t, "dc-1")
defer server1.Stop()

server2 := CreateTestServerDatacenter(t, "dc-2")
defer server2.Stop()

server1.JoinWAN(t, server2.LANAddr)

// create client
consul := ClientAtServer(server1)
consul.config.Tag = "marathon"
// configure fetching services from single datacenter
consul.config.Dc = "dc-1"

// given
// register services in both servers
server1.AddService(t, "serviceA", "passing", []string{"public", "marathon"})
server1.AddService(t, "serviceB", "passing", []string{"marathon"})
server1.AddService(t, "serviceC", "passing", []string{"zookeeper"})

server2.AddService(t, "serviceA", "passing", []string{"private", "marathon"})
server2.AddService(t, "serviceB", "passing", []string{"zookeeper"})
server2.AddService(t, "serviceD", "passing", []string{"marathon"})

// when
services, err := consul.GetAllServices()

// then
assert.NoError(t, err)
assert.Len(t, services, 2)

serviceNames := make(map[string]struct{})
for _, s := range services {
serviceNames[s.Name] = struct{}{}
}
assert.Len(t, serviceNames, 2)
assert.Contains(t, serviceNames, "serviceA")
assert.Contains(t, serviceNames, "serviceB")
assert.NotContains(t, serviceNames, "serviceD")
}

func TestGetServicesUsingProviderWithRetriesOnAgentFailure_ShouldRetryConfiguredNumberOfTimes(t *testing.T) {
t.Parallel()
server1 := CreateTestServer(t)
Expand Down
11 changes: 11 additions & 0 deletions consul/consul_test_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ func CreateTestServer(t *testing.T) *testutil.TestServer {
return server
}

func CreateTestServerDatacenter(t *testing.T, dc string) *testutil.TestServer {
server, err := testutil.NewTestServerConfig(func(c *testutil.TestServerConfig) {
c.Datacenter = dc
c.Ports = testPortConfig(t)
})

assert.NoError(t, err)

return server
}

const MasterToken = "masterToken"

func CreateSecuredTestServer(t *testing.T) *testutil.TestServer {
Expand Down

0 comments on commit 833e9b7

Please sign in to comment.