diff --git a/pkg/router/router.go b/pkg/router/router.go index d9931bc9..1de93326 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -3,7 +3,6 @@ package router import ( "fmt" "net/url" - "reflect" "strings" "time" @@ -12,17 +11,20 @@ import ( // ServiceRouter is responsible for routing a message to a specific notification service using the notification URL type ServiceRouter struct { - logger t.StdLogger - services []t.Service - queue []string - Timeout time.Duration + logger t.StdLogger + services []t.Service + serviceIds []string + schemeCount map[string]int + queue []string + Timeout time.Duration } // New creates a new service router using the specified logger and service URLs func New(logger t.StdLogger, serviceURLs ...string) (*ServiceRouter, error) { router := ServiceRouter{ - logger: logger, - Timeout: 10 * time.Second, + logger: logger, + schemeCount: map[string]int{}, + Timeout: 10 * time.Second, } for _, serviceURL := range serviceURLs { @@ -35,13 +37,29 @@ func New(logger t.StdLogger, serviceURLs ...string) (*ServiceRouter, error) { // AddService initializes the specified service from its URL, and adds it if no errors occur func (router *ServiceRouter) AddService(serviceURL string) error { - service, err := router.initService(serviceURL) + service, scheme, err := router.initService(serviceURL) if err == nil { router.services = append(router.services, service) + router.serviceIds = append(router.serviceIds, router.getNextServiceId(scheme)) } return err } +func (router *ServiceRouter) getNextServiceId(scheme string) string { + if router.schemeCount == nil { + router.schemeCount = map[string]int{} + } + schemeIndex := router.schemeCount[scheme] + 1 + router.schemeCount[scheme] = schemeIndex + + // Only append a sequence number for the second and subsequent instances + if schemeIndex < 2 { + return scheme + } + + return scheme + fmt.Sprintf("%v", schemeIndex) +} + // Send sends the specified message using the routers underlying services func (router *ServiceRouter) Send(message string, params *t.Params) []error { if router == nil { @@ -91,8 +109,8 @@ func (router *ServiceRouter) SendAsync(message string, params *t.Params) chan er if params == nil { params = &t.Params{} } - for _, service := range router.services { - go sendToService(service, proxy, router.Timeout, message, *params) + for i, service := range router.services { + go sendToService(service, proxy, router.Timeout, message, *params, router.serviceIds[i]) } go func() { @@ -105,20 +123,22 @@ func (router *ServiceRouter) SendAsync(message string, params *t.Params) chan er return errors } -func sendToService(service t.Service, results chan error, timeout time.Duration, message string, params t.Params) { +func sendToService(service t.Service, results chan error, timeout time.Duration, message string, params t.Params, serviceId string) { result := make(chan error) - // TODO: There really ought to be a better way to name the services - pkg := reflect.TypeOf(service).Elem().PkgPath() - serviceName := pkg[strings.LastIndex(pkg, "/")+1:] - - go func() { result <- service.Send(message, ¶ms) }() + go func() { + err := service.Send(message, ¶ms) + if err != nil { + err = fmt.Errorf("%v: %v", serviceId, err) + } + result <- err + }() select { case res := <-result: results <- res case <-time.After(timeout): - results <- fmt.Errorf("failed to send using %v: timed out", serviceName) + results <- fmt.Errorf("failed to send using %v: timed out", serviceId) } } @@ -173,37 +193,34 @@ func (router *ServiceRouter) Route(rawURL string, message string) error { return service.Send(message, nil) } -func (router *ServiceRouter) initService(rawURL string) (t.Service, error) { +func (router *ServiceRouter) initService(rawURL string) (service t.Service, scheme string, err error) { - scheme, configURL, err := router.ExtractServiceName(rawURL) + var configURL *url.URL + scheme, configURL, err = router.ExtractServiceName(rawURL) if err != nil { - return nil, err + return } - service, err := newService(scheme) + service, err = newService(scheme) if err != nil { - return nil, err + return } if configURL.Scheme != scheme { router.log("Got custom URL:", configURL.String()) customURLService, ok := service.(t.CustomURLService) if !ok { - return nil, fmt.Errorf("custom URLs are not supported by '%s' service", scheme) + return nil, scheme, fmt.Errorf("custom URLs are not supported by '%s' service", scheme) } configURL, err = customURLService.GetConfigURLFromCustom(configURL) if err != nil { - return nil, err + return } router.log("Converted service URL:", configURL.String()) } err = service.Initialize(configURL, router.logger) - if err != nil { - return service, err - } - - return service, nil + return } // NewService returns a new uninitialized service instance @@ -233,9 +250,14 @@ func (router *ServiceRouter) ListServices() []string { return services } +// ListAddedServices returns a list of the scheme identifiers of the added services +func (router *ServiceRouter) ListAddedServices() []string { + return router.serviceIds +} + // Locate returns the service implementation that corresponds to the given service URL func (router *ServiceRouter) Locate(rawURL string) (t.Service, error) { - service, err := router.initService(rawURL) + service, _, err := router.initService(rawURL) return service, err } diff --git a/pkg/router/router_suite_test.go b/pkg/router/router_suite_test.go index aa6c7942..b231c295 100644 --- a/pkg/router/router_suite_test.go +++ b/pkg/router/router_suite_test.go @@ -57,12 +57,24 @@ var _ = Describe("the router suite", func() { When("initializing a service with a custom URL", func() { It("should return an error if the service does not support it", func() { - service, err := sr.initService("log+https://hybr.is") + service, scheme, err := sr.initService("logger+https://hybr.is") Expect(err).To(HaveOccurred()) + Expect(scheme).To(Equal("logger")) Expect(service).To(BeNil()) }) }) + When("listing added services", func() { + When("multiple instances of the same service have been added", func() { + It("should return a list with unique identifiers for those services", func() { + Expect(sr.AddService("logger://")).To(Succeed()) + Expect(sr.AddService("logger://")).To(Succeed()) + Expect(sr.AddService("logger://")).To(Succeed()) + Expect(sr.ListAddedServices()).To(ConsistOf("logger", "logger2", "logger3")) + }) + }) + }) + Describe("the service map", func() { When("resolving implemented services", func() { services := (&ServiceRouter{}).ListServices() @@ -83,13 +95,15 @@ var _ = Describe("the router suite", func() { When("initializing a service with a custom URL", func() { It("should return an error if the service does not support it", func() { - service, err := sr.initService("log+https://hybr.is") + service, scheme, err := sr.initService("logger+https://hybr.is") Expect(err).To(HaveOccurred()) + Expect(scheme).To(Equal("logger")) Expect(service).To(BeNil()) }) It("should successfully init a service that does support it", func() { - service, err := sr.initService(mockCustomURL) + service, scheme, err := sr.initService(mockCustomURL) Expect(err).NotTo(HaveOccurred()) + Expect(scheme).To(Equal("teams")) Expect(service).NotTo(BeNil()) }) }) @@ -120,7 +134,7 @@ var _ = Describe("the router suite", func() { When("router has not been provided a logger", func() { It("should not crash when trying to log", func() { router := ServiceRouter{} - _, err := router.initService(mockCustomURL) + _, _, err := router.initService(mockCustomURL) Expect(err).NotTo(HaveOccurred()) }) })