Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create a Hub interface #272

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _examples/helloworld/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func main() {
}
}

func initHub() (*eventhub.Hub, []string) {
func initHub() (eventhub.Hub, []string) {
namespace := mustGetenv("EVENTHUB_NAMESPACE")
hubMgmt, err := ensureEventHub(context.Background(), HubName)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion _examples/helloworld/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func main() {
}
}

func initHub() (*eventhub.Hub, []string) {
func initHub() (eventhub.Hub, []string) {
namespace := mustGetenv("EVENTHUB_NAMESPACE")
hubMgmt, err := ensureEventHub(context.Background(), HubName)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change Log

## Unreleased

- Move Hub to an interface (#272)

## `v3.3.18`

- Fixing issue where the LeaserCheckpointer could fail with a "ContainerAlreadyExists" error. (#253)
Expand Down
2 changes: 1 addition & 1 deletion eph/eph.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type (
name string
consumerGroup string
tokenProvider auth.TokenProvider
client *eventhub.Hub
client eventhub.Hub
leaser Leaser
checkpointer Checkpointer
scheduler *scheduler
Expand Down
4 changes: 2 additions & 2 deletions eph/eph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,15 @@ func (s *testSuite) newInMemoryEPHWithOptions(hubName string, store *sharedStore
return processor, nil
}

func (s *testSuite) newClient(t *testing.T, hubName string, opts ...eventhub.HubOption) *eventhub.Hub {
func (s *testSuite) newClient(t *testing.T, hubName string, opts ...eventhub.HubOption) eventhub.Hub {
provider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars(), aad.JWTProviderWithAzureEnvironment(&s.Env))
if err != nil {
t.Fatal(err)
}
return s.newClientWithProvider(t, hubName, provider, opts...)
}

func (s *testSuite) newClientWithProvider(t *testing.T, hubName string, provider auth.TokenProvider, opts ...eventhub.HubOption) *eventhub.Hub {
func (s *testSuite) newClientWithProvider(t *testing.T, hubName string, provider auth.TokenProvider, opts ...eventhub.HubOption) eventhub.Hub {
opts = append(opts, eventhub.HubWithEnvironment(s.Env))
client, err := eventhub.NewHub(s.Namespace, hubName, provider, opts...)
if err != nil {
Expand Down
127 changes: 92 additions & 35 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,39 @@ const (

type (
// Hub provides the ability to send and receive Event Hub messages
Hub struct {
Hub interface {
// GetRuntimeInformation fetches runtime information from the Event Hub management node
GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error)
// GetPartitionInformation fetches runtime information about a specific partition from the Event Hub management node
GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error)
// Close drains and closes all of the existing senders, receivers and connections
Close(ctx context.Context) error
// Receive subscribes for messages sent to the provided entityPath.
//
// The context passed into Receive is only used to limit the amount of time the caller will wait for the Receive
// method to connect to the Event Hub. The context passed in does not control the lifetime of Receive after connection.
//
// If Receive encounters an initial error setting up the connection, an error will be returned.
//
// If Receive starts successfully, a *ListenerHandle and a nil error will be returned. The ListenerHandle exposes
// methods which will help manage the life span of the receiver.
//
// # ListenerHandle.Close(ctx) closes the receiver
//
// # ListenerHandle.Done() signals the consumer when the receiver has stopped
//
// ListenerHandle.Err() provides the last error the listener encountered and was unable to recover from
Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error)
// Send sends an event to the Event Hub
//
// Send will retry sending the message for as long as the context allows
Send(ctx context.Context, event *Event, opts ...SendOption) error
// SendBatch sends a batch of events to the Hub
SendBatch(ctx context.Context, iterator BatchIterator, opts ...BatchOption) error
}

// Hub provides the ability to send and receive Event Hub messages
hubImpl struct {
name string
namespace *namespace
receivers map[string]*receiver
Expand Down Expand Up @@ -92,7 +124,7 @@ type (

// HubOption provides structure for configuring new Event Hub clients. For building new Event Hubs, see
// HubManagementOption.
HubOption func(h *Hub) error
HubOption func(h Hub) error

// HubManager provides CRUD functionality for Event Hubs
HubManager struct {
Expand Down Expand Up @@ -336,7 +368,7 @@ func hubEntryToEntity(entry *hubEntry) *HubEntity {
// NOTE: If the AZURE_ENVIRONMENT variable is set, it will be used to set the ServiceBusEndpointSuffix
// from the corresponding azure.Environment type at the end of the namespace host string. The default
// is azure.PublicCloud.
func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...HubOption) (*Hub, error) {
func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...HubOption) (Hub, error) {
env := azure.PublicCloud
if e := os.Getenv("AZURE_ENVIRONMENT"); e != "" {
var err error
Expand All @@ -350,7 +382,7 @@ func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...Hu
return nil, err
}

h := &Hub{
h := &hubImpl{
name: name,
namespace: ns,
offsetPersister: persist.NewMemoryPersister(),
Expand Down Expand Up @@ -385,9 +417,7 @@ func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...Hu
// 2) Expected Environment Variable:
// - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal
//
//
// AAD TokenProvider environment variables:
//
// 1. client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and
// "AZURE_CLIENT_SECRET"
//
Expand All @@ -397,9 +427,8 @@ func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...Hu
// 3. Managed Service Identity (MSI): attempt to authenticate via MSI on the default local MSI internally addressable IP
// and port. See: adal.GetMSIVMEndpoint()
//
//
// The Azure Environment used can be specified using the name of the Azure Environment set in the AZURE_ENVIRONMENT var.
func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOption) (*Hub, error) {
func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOption) (Hub, error) {
var provider auth.TokenProvider
provider, sasErr := sas.NewTokenProvider(sas.TokenProviderWithEnvironmentVars())
if sasErr == nil {
Expand All @@ -420,7 +449,6 @@ func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOp
// - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
// - "EVENTHUB_NAME" the name of the Event Hub instance
//
//
// This method depends on NewHubWithNamespaceNameAndEnvironment which will attempt to build a token provider from
// environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither
// can be built, it will return error.
Expand All @@ -437,7 +465,6 @@ func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOp
// 2) Expected Environment Variable:
// - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal
//
//
// AAD TokenProvider environment variables:
// 1. client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and
// "AZURE_CLIENT_SECRET"
Expand All @@ -449,7 +476,7 @@ func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOp
//
//
// The Azure Environment used can be specified using the name of the Azure Environment set in the AZURE_ENVIRONMENT var.
func NewHubFromEnvironment(opts ...HubOption) (*Hub, error) {
func NewHubFromEnvironment(opts ...HubOption) (Hub, error) {
const envErrMsg = "environment var %s must not be empty"
var namespace, name string

Expand All @@ -467,8 +494,38 @@ func NewHubFromEnvironment(opts ...HubOption) (*Hub, error) {
// NewHubFromConnectionString creates a new Event Hub client for sending and receiving messages from a connection string
// formatted like the following:
//
// Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName
func NewHubFromConnectionString(connStr string, opts ...HubOption) (*Hub, error) {
// Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName
func NewHubFromConnectionString(connStr string, opts ...HubOption) (Hub, error) {
parsed, err := conn.ParsedConnectionFromStr(connStr)
if err != nil {
return nil, err
}

ns, err := newNamespace(namespaceWithConnectionString(connStr))
if err != nil {
return nil, err
}

h := &hubImpl{
name: parsed.HubName,
namespace: ns,
offsetPersister: persist.NewMemoryPersister(),
userAgent: rootUserAgent,
receivers: make(map[string]*receiver),
senderRetryOptions: newSenderRetryOptions(),
}

for _, opt := range opts {
err := opt(h)
if err != nil {
return nil, err
}
}

return h, err
}

func NewMockHubFromConnectionString(connStr string, opts ...HubOption) (Hub, error) {
parsed, err := conn.ParsedConnectionFromStr(connStr)
if err != nil {
return nil, err
Expand All @@ -479,7 +536,7 @@ func NewHubFromConnectionString(connStr string, opts ...HubOption) (*Hub, error)
return nil, err
}

h := &Hub{
h := &hubImpl{
name: parsed.HubName,
namespace: ns,
offsetPersister: persist.NewMemoryPersister(),
Expand All @@ -499,7 +556,7 @@ func NewHubFromConnectionString(connStr string, opts ...HubOption) (*Hub, error)
}

// GetRuntimeInformation fetches runtime information from the Event Hub management node
func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error) {
func (h *hubImpl) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error) {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.GetRuntimeInformation")
defer span.End()
client := newClient(h.namespace, h.name)
Expand All @@ -525,7 +582,7 @@ func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation
}

// GetPartitionInformation fetches runtime information about a specific partition from the Event Hub management node
func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error) {
func (h *hubImpl) GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error) {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.GetPartitionInformation")
defer span.End()
client := newClient(h.namespace, h.name)
Expand All @@ -550,7 +607,7 @@ func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (
}

// Close drains and closes all of the existing senders, receivers and connections
func (h *Hub) Close(ctx context.Context) error {
func (h *hubImpl) Close(ctx context.Context) error {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.Close")
defer span.End()

Expand Down Expand Up @@ -582,7 +639,7 @@ func (h *Hub) Close(ctx context.Context) error {
}

// closeReceivers will close the receivers on the hub and return the last error
func (h *Hub) closeReceivers(ctx context.Context) error {
func (h *hubImpl) closeReceivers(ctx context.Context) error {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.closeReceivers")
defer span.End()

Expand Down Expand Up @@ -611,7 +668,7 @@ func (h *Hub) closeReceivers(ctx context.Context) error {
// ListenerHandle.Done() signals the consumer when the receiver has stopped
//
// ListenerHandle.Err() provides the last error the listener encountered and was unable to recover from
func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error) {
func (h *hubImpl) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error) {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.Receive")
defer span.End()

Expand Down Expand Up @@ -639,7 +696,7 @@ func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler,
// Send sends an event to the Event Hub
//
// Send will retry sending the message for as long as the context allows
func (h *Hub) Send(ctx context.Context, event *Event, opts ...SendOption) error {
func (h *hubImpl) Send(ctx context.Context, event *Event, opts ...SendOption) error {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.Send")
defer span.End()

Expand All @@ -652,7 +709,7 @@ func (h *Hub) Send(ctx context.Context, event *Event, opts ...SendOption) error
}

// SendBatch sends a batch of events to the Hub
func (h *Hub) SendBatch(ctx context.Context, iterator BatchIterator, opts ...BatchOption) error {
func (h *hubImpl) SendBatch(ctx context.Context, iterator BatchIterator, opts ...BatchOption) error {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.SendBatch")
defer span.End()

Expand Down Expand Up @@ -698,17 +755,17 @@ func (h *Hub) SendBatch(ctx context.Context, iterator BatchIterator, opts ...Bat

// HubWithPartitionedSender configures the Hub instance to send to a specific event Hub partition
func HubWithPartitionedSender(partitionID string) HubOption {
return func(h *Hub) error {
h.senderPartitionID = &partitionID
return func(h Hub) error {
h.(*hubImpl).senderPartitionID = &partitionID
return nil
}
}

// HubWithOffsetPersistence configures the Hub instance to read and write offsets so that if a Hub is interrupted, it
// can resume after the last consumed event.
func HubWithOffsetPersistence(offsetPersister persist.CheckpointPersister) HubOption {
return func(h *Hub) error {
h.offsetPersister = offsetPersister
return func(h Hub) error {
h.(*hubImpl).offsetPersister = offsetPersister
return nil
}
}
Expand All @@ -719,25 +776,25 @@ func HubWithOffsetPersistence(offsetPersister persist.CheckpointPersister) HubOp
//
// Max user agent length is specified by the const maxUserAgentLen.
func HubWithUserAgent(userAgent string) HubOption {
return func(h *Hub) error {
return h.appendAgent(userAgent)
return func(h Hub) error {
return h.(*hubImpl).appendAgent(userAgent)
}
}

// HubWithEnvironment configures the Hub to use the specified environment.
//
// By default, the Hub instance will use Azure US Public cloud environment
func HubWithEnvironment(env azure.Environment) HubOption {
return func(h *Hub) error {
h.namespace.host = "amqps://" + h.namespace.name + "." + env.ServiceBusEndpointSuffix
return func(h Hub) error {
h.(*hubImpl).namespace.host = "amqps://" + h.(*hubImpl).namespace.name + "." + env.ServiceBusEndpointSuffix
return nil
}
}

// HubWithWebSocketConnection configures the Hub to use a WebSocket connection wss:// rather than amqps://
func HubWithWebSocketConnection() HubOption {
return func(h *Hub) error {
h.namespace.useWebSocket = true
return func(h Hub) error {
h.(*hubImpl).namespace.useWebSocket = true
return nil
}
}
Expand All @@ -746,13 +803,13 @@ func HubWithWebSocketConnection() HubOption {
// in addition to the original attempt.
// 0 indicates no retries, and < 0 will cause infinite retries.
func HubWithSenderMaxRetryCount(maxRetryCount int) HubOption {
return func(h *Hub) error {
h.senderRetryOptions.maxRetries = maxRetryCount
return func(h Hub) error {
h.(*hubImpl).senderRetryOptions.maxRetries = maxRetryCount
return nil
}
}

func (h *Hub) appendAgent(userAgent string) error {
func (h *hubImpl) appendAgent(userAgent string) error {
ua := path.Join(h.userAgent, userAgent)
if len(ua) > maxUserAgentLen {
return fmt.Errorf("user agent string has surpassed the max length of %d", maxUserAgentLen)
Expand All @@ -761,7 +818,7 @@ func (h *Hub) appendAgent(userAgent string) error {
return nil
}

func (h *Hub) getSender(ctx context.Context) (*sender, error) {
func (h *hubImpl) getSender(ctx context.Context) (*sender, error) {
h.senderMu.Lock()
defer h.senderMu.Unlock()

Expand Down
Loading