diff --git a/.goxc.json b/.goxc.json index d07e4f2..30aad63 100644 --- a/.goxc.json +++ b/.goxc.json @@ -8,7 +8,7 @@ "deb-source" ], "BuildConstraints": "linux,!arm darwin", - "PackageVersion": "1.5.1", + "PackageVersion": "1.5.2", "TaskSettings": { "bintray": { "downloadspage": "bintray.md", diff --git a/Makefile b/Makefile index 3541a03..6414ff1 100644 --- a/Makefile +++ b/Makefile @@ -64,7 +64,7 @@ FPM-exists: @fpm -v || \ (echo >&2 "FPM must be installed on the system. See https://github.com/jordansissel/fpm"; false) -deb: FPM-exists build +deb: FPM-exists build-linux mkdir -p dist/$(VERSION)/ cd dist/$(VERSION)/ && \ fpm -s dir \ @@ -73,6 +73,7 @@ deb: FPM-exists build -v $(VERSION) \ --url="https://github.com/allegro/marathon-consul" \ --vendor=Allegro \ + --architecture=amd64 \ --maintainer="Allegro Group " \ --description "Marathon-consul service (performs Marathon Tasks registration as Consul Services for service discovery) Marathon-consul takes information provided by the Marathon event bus and forwards it to Consul agents. It also re-syncs all the information from Marathon to Consul on startup and repeats it with given interval." \ --deb-priority optional \ diff --git a/config/config.go b/config/config.go index 79623cd..a166c88 100644 --- a/config/config.go +++ b/config/config.go @@ -79,6 +79,7 @@ func (config *Config) parseFlags() { flag.StringVar(&config.Consul.IgnoredHealthChecks, "consul-ignored-healthchecks", "", "A comma separated blacklist of Marathon health check types that will not be migrated to Consul, e.g. command,tcp") flag.BoolVar(&config.Consul.EnableTagOverride, "consul-enable-tag-override", false, "Disable the anti-entropy feature for all services") flag.StringVar(&config.Consul.LocalAgentHost, "consul-local-agent-host", "", "Consul Agent hostname or IP that should be used for startup sync") + flag.StringVar(&config.Consul.Dc, "consul-dc", "", "Consul DC where to look for services, all if empty") // Web flag.StringVar(&config.Web.Listen, "listen", ":4000", "Accept connections at this address") diff --git a/consul/config.go b/consul/config.go index e6ee4a3..0a1f3a3 100644 --- a/consul/config.go +++ b/consul/config.go @@ -11,6 +11,7 @@ type Config struct { SslCaCert string Token string Tag string + Dc string Timeout time.Interval RequestRetries uint32 AgentFailuresTolerance uint32 diff --git a/consul/consul.go b/consul/consul.go index 601306e..61222b5 100644 --- a/consul/consul.go +++ b/consul/consul.go @@ -64,7 +64,7 @@ func (c *Consul) getServicesUsingProviderWithRetriesOnAgentFailure(provide Servi } func (c *Consul) getServicesUsingAgent(name string, agent *consulAPI.Client) ([]*service.Service, error) { - dcAwareQueries, err := dcAwareQueriesForAllDCs(agent) + dcAwareQueries, err := dcAwareQueries(agent, c.config.Dc) if err != nil { return nil, err } @@ -80,12 +80,18 @@ func (c *Consul) getServicesUsingAgent(name string, agent *consulAPI.Client) ([] return allServices, nil } -func dcAwareQueriesForAllDCs(agent *consulAPI.Client) ([]*consulAPI.QueryOptions, error) { +func dcAwareQueries(agent *consulAPI.Client, singleDc string) ([]*consulAPI.QueryOptions, error) { + if singleDc != "" { + var queries []*consulAPI.QueryOptions + queries = append(queries, &consulAPI.QueryOptions{ + Datacenter: singleDc, + }) + return queries, nil + } datacenters, err := agent.Catalog().Datacenters() if err != nil { return nil, err } - var queries []*consulAPI.QueryOptions for _, dc := range datacenters { queries = append(queries, &consulAPI.QueryOptions{ @@ -101,7 +107,7 @@ func (c *Consul) GetAllServices() ([]*service.Service, error) { } func (c *Consul) getAllServices(agent *consulAPI.Client) ([]*service.Service, error) { - dcAwareQueries, err := dcAwareQueriesForAllDCs(agent) + dcAwareQueries, err := dcAwareQueries(agent, c.config.Dc) if err != nil { return nil, err } @@ -110,7 +116,8 @@ func (c *Consul) getAllServices(agent *consulAPI.Client) ([]*service.Service, er for _, dcAwareQuery := range dcAwareQueries { consulServices, _, err := agent.Catalog().Services(dcAwareQuery) if err != nil { - return nil, err + log.WithError(err).Error("An error occurred getting services from Consul, will continue with another DC") + continue } for consulService, tags := range consulServices { if contains(tags, c.config.Tag) { @@ -231,7 +238,7 @@ func (c *Consul) deregisterMultipleServices(services []*service.Service, taskID func (c *Consul) findServicesByTaskID(searchedTaskID apps.TaskID) ([]*service.Service, error) { return c.getServicesUsingProviderWithRetriesOnAgentFailure(func(agent *consulAPI.Client) ([]*service.Service, error) { - dcAwareQueries, err := dcAwareQueriesForAllDCs(agent) + dcAwareQueries, err := dcAwareQueries(agent, c.config.Dc) if err != nil { return nil, err } @@ -241,7 +248,8 @@ func (c *Consul) findServicesByTaskID(searchedTaskID apps.TaskID) ([]*service.Se for _, dcAwareQuery := range dcAwareQueries { consulServices, _, err := agent.Catalog().Services(dcAwareQuery) if err != nil { - return nil, err + log.WithError(err).Error("An error occurred getting services from Consul, will continue with another DC") + continue } for consulService, tags := range consulServices { if contains(tags, searchedTag) { diff --git a/consul/consul_test.go b/consul/consul_test.go index 790c504..ed8ff47 100644 --- a/consul/consul_test.go +++ b/consul/consul_test.go @@ -267,6 +267,50 @@ func TestGetAllServices(t *testing.T) { assert.Contains(t, serviceNames, "serviceB") } +func TestGetServicesFromSingleDc(t *testing.T) { + t.Parallel() + // create cluster of 2 consul servers + server1 := CreateTestServerDatacenter(t, "dc-1") + defer server1.Stop() + + server2 := CreateTestServerDatacenter(t, "dc-2") + defer server2.Stop() + + server1.JoinWAN(t, server2.LANAddr) + + // create client + consul := ClientAtServer(server1) + consul.config.Tag = "marathon" + // configure fetching services from single datacenter + consul.config.Dc = "dc-1" + + // given + // register services in both servers + server1.AddService(t, "serviceA", "passing", []string{"public", "marathon"}) + server1.AddService(t, "serviceB", "passing", []string{"marathon"}) + server1.AddService(t, "serviceC", "passing", []string{"zookeeper"}) + + server2.AddService(t, "serviceA", "passing", []string{"private", "marathon"}) + server2.AddService(t, "serviceB", "passing", []string{"zookeeper"}) + server2.AddService(t, "serviceD", "passing", []string{"marathon"}) + + // when + services, err := consul.GetAllServices() + + // then + assert.NoError(t, err) + assert.Len(t, services, 2) + + serviceNames := make(map[string]struct{}) + for _, s := range services { + serviceNames[s.Name] = struct{}{} + } + assert.Len(t, serviceNames, 2) + assert.Contains(t, serviceNames, "serviceA") + assert.Contains(t, serviceNames, "serviceB") + assert.NotContains(t, serviceNames, "serviceD") +} + func TestGetServicesUsingProviderWithRetriesOnAgentFailure_ShouldRetryConfiguredNumberOfTimes(t *testing.T) { t.Parallel() server1 := CreateTestServer(t) diff --git a/consul/consul_test_server.go b/consul/consul_test_server.go index d45dab5..5ab5f65 100644 --- a/consul/consul_test_server.go +++ b/consul/consul_test_server.go @@ -23,6 +23,17 @@ func CreateTestServer(t *testing.T) *testutil.TestServer { return server } +func CreateTestServerDatacenter(t *testing.T, dc string) *testutil.TestServer { + server, err := testutil.NewTestServerConfig(func(c *testutil.TestServerConfig) { + c.Datacenter = dc + c.Ports = testPortConfig(t) + }) + + assert.NoError(t, err) + + return server +} + const MasterToken = "masterToken" func CreateSecuredTestServer(t *testing.T) *testutil.TestServer {