Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add unique service instance identifiers to router #409

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 52 additions & 30 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package router
import (
"fmt"
"net/url"
"reflect"
"strings"
"time"

Expand All @@ -12,17 +11,20 @@ import (

// ServiceRouter is responsible for routing a message to a specific notification service using the notification URL
type ServiceRouter struct {
logger t.StdLogger
services []t.Service
queue []string
Timeout time.Duration
logger t.StdLogger
services []t.Service
serviceIds []string
schemeCount map[string]int
queue []string
Timeout time.Duration
}

// New creates a new service router using the specified logger and service URLs
func New(logger t.StdLogger, serviceURLs ...string) (*ServiceRouter, error) {
router := ServiceRouter{
logger: logger,
Timeout: 10 * time.Second,
logger: logger,
schemeCount: map[string]int{},
Timeout: 10 * time.Second,
}

for _, serviceURL := range serviceURLs {
Expand All @@ -35,13 +37,29 @@ func New(logger t.StdLogger, serviceURLs ...string) (*ServiceRouter, error) {

// AddService initializes the specified service from its URL, and adds it if no errors occur
func (router *ServiceRouter) AddService(serviceURL string) error {
service, err := router.initService(serviceURL)
service, scheme, err := router.initService(serviceURL)
if err == nil {
router.services = append(router.services, service)
router.serviceIds = append(router.serviceIds, router.getNextServiceId(scheme))
}
return err
}

func (router *ServiceRouter) getNextServiceId(scheme string) string {
if router.schemeCount == nil {
router.schemeCount = map[string]int{}
}
schemeIndex := router.schemeCount[scheme] + 1
router.schemeCount[scheme] = schemeIndex

// Only append a sequence number for the second and subsequent instances
if schemeIndex < 2 {
return scheme
}

return scheme + fmt.Sprintf("%v", schemeIndex)
}

// Send sends the specified message using the routers underlying services
func (router *ServiceRouter) Send(message string, params *t.Params) []error {
if router == nil {
Expand Down Expand Up @@ -91,8 +109,8 @@ func (router *ServiceRouter) SendAsync(message string, params *t.Params) chan er
if params == nil {
params = &t.Params{}
}
for _, service := range router.services {
go sendToService(service, proxy, router.Timeout, message, *params)
for i, service := range router.services {
go sendToService(service, proxy, router.Timeout, message, *params, router.serviceIds[i])
}

go func() {
Expand All @@ -105,20 +123,22 @@ func (router *ServiceRouter) SendAsync(message string, params *t.Params) chan er
return errors
}

func sendToService(service t.Service, results chan error, timeout time.Duration, message string, params t.Params) {
func sendToService(service t.Service, results chan error, timeout time.Duration, message string, params t.Params, serviceId string) {
result := make(chan error)

// TODO: There really ought to be a better way to name the services
pkg := reflect.TypeOf(service).Elem().PkgPath()
serviceName := pkg[strings.LastIndex(pkg, "/")+1:]

go func() { result <- service.Send(message, &params) }()
go func() {
err := service.Send(message, &params)
if err != nil {
err = fmt.Errorf("%v: %v", serviceId, err)
}
result <- err
}()

select {
case res := <-result:
results <- res
case <-time.After(timeout):
results <- fmt.Errorf("failed to send using %v: timed out", serviceName)
results <- fmt.Errorf("failed to send using %v: timed out", serviceId)
}
}

Expand Down Expand Up @@ -173,37 +193,34 @@ func (router *ServiceRouter) Route(rawURL string, message string) error {
return service.Send(message, nil)
}

func (router *ServiceRouter) initService(rawURL string) (t.Service, error) {
func (router *ServiceRouter) initService(rawURL string) (service t.Service, scheme string, err error) {

scheme, configURL, err := router.ExtractServiceName(rawURL)
var configURL *url.URL
scheme, configURL, err = router.ExtractServiceName(rawURL)
if err != nil {
return nil, err
return
}

service, err := newService(scheme)
service, err = newService(scheme)
if err != nil {
return nil, err
return
}

if configURL.Scheme != scheme {
router.log("Got custom URL:", configURL.String())
customURLService, ok := service.(t.CustomURLService)
if !ok {
return nil, fmt.Errorf("custom URLs are not supported by '%s' service", scheme)
return nil, scheme, fmt.Errorf("custom URLs are not supported by '%s' service", scheme)
}
configURL, err = customURLService.GetConfigURLFromCustom(configURL)
if err != nil {
return nil, err
return
}
router.log("Converted service URL:", configURL.String())
}

err = service.Initialize(configURL, router.logger)
if err != nil {
return service, err
}

return service, nil
return
}

// NewService returns a new uninitialized service instance
Expand Down Expand Up @@ -233,9 +250,14 @@ func (router *ServiceRouter) ListServices() []string {
return services
}

// ListAddedServices returns a list of the scheme identifiers of the added services
func (router *ServiceRouter) ListAddedServices() []string {
return router.serviceIds
}

// Locate returns the service implementation that corresponds to the given service URL
func (router *ServiceRouter) Locate(rawURL string) (t.Service, error) {
service, err := router.initService(rawURL)
service, _, err := router.initService(rawURL)
return service, err
}

Expand Down
22 changes: 18 additions & 4 deletions pkg/router/router_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,24 @@ var _ = Describe("the router suite", func() {

When("initializing a service with a custom URL", func() {
It("should return an error if the service does not support it", func() {
service, err := sr.initService("log+https://hybr.is")
service, scheme, err := sr.initService("logger+https://hybr.is")
Expect(err).To(HaveOccurred())
Expect(scheme).To(Equal("logger"))
Expect(service).To(BeNil())
})
})

When("listing added services", func() {
When("multiple instances of the same service have been added", func() {
It("should return a list with unique identifiers for those services", func() {
Expect(sr.AddService("logger://")).To(Succeed())
Expect(sr.AddService("logger://")).To(Succeed())
Expect(sr.AddService("logger://")).To(Succeed())
Expect(sr.ListAddedServices()).To(ConsistOf("logger", "logger2", "logger3"))
})
})
})

Describe("the service map", func() {
When("resolving implemented services", func() {
services := (&ServiceRouter{}).ListServices()
Expand All @@ -83,13 +95,15 @@ var _ = Describe("the router suite", func() {

When("initializing a service with a custom URL", func() {
It("should return an error if the service does not support it", func() {
service, err := sr.initService("log+https://hybr.is")
service, scheme, err := sr.initService("logger+https://hybr.is")
Expect(err).To(HaveOccurred())
Expect(scheme).To(Equal("logger"))
Expect(service).To(BeNil())
})
It("should successfully init a service that does support it", func() {
service, err := sr.initService(mockCustomURL)
service, scheme, err := sr.initService(mockCustomURL)
Expect(err).NotTo(HaveOccurred())
Expect(scheme).To(Equal("teams"))
Expect(service).NotTo(BeNil())
})
})
Expand Down Expand Up @@ -120,7 +134,7 @@ var _ = Describe("the router suite", func() {
When("router has not been provided a logger", func() {
It("should not crash when trying to log", func() {
router := ServiceRouter{}
_, err := router.initService(mockCustomURL)
_, _, err := router.initService(mockCustomURL)
Expect(err).NotTo(HaveOccurred())
})
})
Expand Down