From 57e80f6330a0d53f771cfb37e732c888b3ff4613 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Sat, 13 Aug 2022 13:36:51 -0700 Subject: [PATCH] create a Hub interface --- _examples/helloworld/consumer/main.go | 2 +- _examples/helloworld/producer/main.go | 2 +- changelog.md | 4 + eph/eph.go | 2 +- eph/eph_test.go | 4 +- hub.go | 127 ++++++++++++++++------- hub_test.go | 57 +++++----- internal/stress/throttling/throttling.go | 6 +- races_test.go | 4 +- receiver.go | 6 +- sender.go | 4 +- sender_test.go | 2 +- storage/eph_test.go | 4 +- tracing.go | 2 +- 14 files changed, 146 insertions(+), 80 deletions(-) diff --git a/_examples/helloworld/consumer/main.go b/_examples/helloworld/consumer/main.go index fe6a91e3..893576cc 100644 --- a/_examples/helloworld/consumer/main.go +++ b/_examples/helloworld/consumer/main.go @@ -58,7 +58,7 @@ func main() { } } -func initHub() (*eventhub.Hub, []string) { +func initHub() (eventhub.Hub, []string) { namespace := mustGetenv("EVENTHUB_NAMESPACE") hubMgmt, err := ensureEventHub(context.Background(), HubName) if err != nil { diff --git a/_examples/helloworld/producer/main.go b/_examples/helloworld/producer/main.go index a43b69a4..0cb2cf81 100644 --- a/_examples/helloworld/producer/main.go +++ b/_examples/helloworld/producer/main.go @@ -41,7 +41,7 @@ func main() { } } -func initHub() (*eventhub.Hub, []string) { +func initHub() (eventhub.Hub, []string) { namespace := mustGetenv("EVENTHUB_NAMESPACE") hubMgmt, err := ensureEventHub(context.Background(), HubName) if err != nil { diff --git a/changelog.md b/changelog.md index ac72c7c1..b8e5f3d3 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,9 @@ # Change Log +## Unreleased + +- Move Hub to an interface (#272) + ## `v3.3.18` - Fixing issue where the LeaserCheckpointer could fail with a "ContainerAlreadyExists" error. (#253) diff --git a/eph/eph.go b/eph/eph.go index dfcac9f6..917db4c2 100644 --- a/eph/eph.go +++ b/eph/eph.go @@ -65,7 +65,7 @@ type ( name string consumerGroup string tokenProvider auth.TokenProvider - client *eventhub.Hub + client eventhub.Hub leaser Leaser checkpointer Checkpointer scheduler *scheduler diff --git a/eph/eph_test.go b/eph/eph_test.go index bcd53686..b67e578c 100644 --- a/eph/eph_test.go +++ b/eph/eph_test.go @@ -258,7 +258,7 @@ func (s *testSuite) newInMemoryEPHWithOptions(hubName string, store *sharedStore return processor, nil } -func (s *testSuite) newClient(t *testing.T, hubName string, opts ...eventhub.HubOption) *eventhub.Hub { +func (s *testSuite) newClient(t *testing.T, hubName string, opts ...eventhub.HubOption) eventhub.Hub { provider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars(), aad.JWTProviderWithAzureEnvironment(&s.Env)) if err != nil { t.Fatal(err) @@ -266,7 +266,7 @@ func (s *testSuite) newClient(t *testing.T, hubName string, opts ...eventhub.Hub return s.newClientWithProvider(t, hubName, provider, opts...) } -func (s *testSuite) newClientWithProvider(t *testing.T, hubName string, provider auth.TokenProvider, opts ...eventhub.HubOption) *eventhub.Hub { +func (s *testSuite) newClientWithProvider(t *testing.T, hubName string, provider auth.TokenProvider, opts ...eventhub.HubOption) eventhub.Hub { opts = append(opts, eventhub.HubWithEnvironment(s.Env)) client, err := eventhub.NewHub(s.Namespace, hubName, provider, opts...) if err != nil { diff --git a/hub.go b/hub.go index b38e2338..d7449520 100644 --- a/hub.go +++ b/hub.go @@ -57,7 +57,39 @@ const ( type ( // Hub provides the ability to send and receive Event Hub messages - Hub struct { + Hub interface { + // GetRuntimeInformation fetches runtime information from the Event Hub management node + GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error) + // GetPartitionInformation fetches runtime information about a specific partition from the Event Hub management node + GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error) + // Close drains and closes all of the existing senders, receivers and connections + Close(ctx context.Context) error + // Receive subscribes for messages sent to the provided entityPath. + // + // The context passed into Receive is only used to limit the amount of time the caller will wait for the Receive + // method to connect to the Event Hub. The context passed in does not control the lifetime of Receive after connection. + // + // If Receive encounters an initial error setting up the connection, an error will be returned. + // + // If Receive starts successfully, a *ListenerHandle and a nil error will be returned. The ListenerHandle exposes + // methods which will help manage the life span of the receiver. + // + // # ListenerHandle.Close(ctx) closes the receiver + // + // # ListenerHandle.Done() signals the consumer when the receiver has stopped + // + // ListenerHandle.Err() provides the last error the listener encountered and was unable to recover from + Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error) + // Send sends an event to the Event Hub + // + // Send will retry sending the message for as long as the context allows + Send(ctx context.Context, event *Event, opts ...SendOption) error + // SendBatch sends a batch of events to the Hub + SendBatch(ctx context.Context, iterator BatchIterator, opts ...BatchOption) error + } + + // Hub provides the ability to send and receive Event Hub messages + hubImpl struct { name string namespace *namespace receivers map[string]*receiver @@ -92,7 +124,7 @@ type ( // HubOption provides structure for configuring new Event Hub clients. For building new Event Hubs, see // HubManagementOption. - HubOption func(h *Hub) error + HubOption func(h Hub) error // HubManager provides CRUD functionality for Event Hubs HubManager struct { @@ -336,7 +368,7 @@ func hubEntryToEntity(entry *hubEntry) *HubEntity { // NOTE: If the AZURE_ENVIRONMENT variable is set, it will be used to set the ServiceBusEndpointSuffix // from the corresponding azure.Environment type at the end of the namespace host string. The default // is azure.PublicCloud. -func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...HubOption) (*Hub, error) { +func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...HubOption) (Hub, error) { env := azure.PublicCloud if e := os.Getenv("AZURE_ENVIRONMENT"); e != "" { var err error @@ -350,7 +382,7 @@ func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...Hu return nil, err } - h := &Hub{ + h := &hubImpl{ name: name, namespace: ns, offsetPersister: persist.NewMemoryPersister(), @@ -385,9 +417,7 @@ func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...Hu // 2) Expected Environment Variable: // - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal // -// // AAD TokenProvider environment variables: -// // 1. client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and // "AZURE_CLIENT_SECRET" // @@ -397,9 +427,8 @@ func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...Hu // 3. Managed Service Identity (MSI): attempt to authenticate via MSI on the default local MSI internally addressable IP // and port. See: adal.GetMSIVMEndpoint() // -// // The Azure Environment used can be specified using the name of the Azure Environment set in the AZURE_ENVIRONMENT var. -func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOption) (*Hub, error) { +func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOption) (Hub, error) { var provider auth.TokenProvider provider, sasErr := sas.NewTokenProvider(sas.TokenProviderWithEnvironmentVars()) if sasErr == nil { @@ -420,7 +449,6 @@ func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOp // - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance // - "EVENTHUB_NAME" the name of the Event Hub instance // -// // This method depends on NewHubWithNamespaceNameAndEnvironment which will attempt to build a token provider from // environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither // can be built, it will return error. @@ -437,7 +465,6 @@ func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOp // 2) Expected Environment Variable: // - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal // -// // AAD TokenProvider environment variables: // 1. client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and // "AZURE_CLIENT_SECRET" @@ -449,7 +476,7 @@ func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOp // // // The Azure Environment used can be specified using the name of the Azure Environment set in the AZURE_ENVIRONMENT var. -func NewHubFromEnvironment(opts ...HubOption) (*Hub, error) { +func NewHubFromEnvironment(opts ...HubOption) (Hub, error) { const envErrMsg = "environment var %s must not be empty" var namespace, name string @@ -467,8 +494,38 @@ func NewHubFromEnvironment(opts ...HubOption) (*Hub, error) { // NewHubFromConnectionString creates a new Event Hub client for sending and receiving messages from a connection string // formatted like the following: // -// Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName -func NewHubFromConnectionString(connStr string, opts ...HubOption) (*Hub, error) { +// Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName +func NewHubFromConnectionString(connStr string, opts ...HubOption) (Hub, error) { + parsed, err := conn.ParsedConnectionFromStr(connStr) + if err != nil { + return nil, err + } + + ns, err := newNamespace(namespaceWithConnectionString(connStr)) + if err != nil { + return nil, err + } + + h := &hubImpl{ + name: parsed.HubName, + namespace: ns, + offsetPersister: persist.NewMemoryPersister(), + userAgent: rootUserAgent, + receivers: make(map[string]*receiver), + senderRetryOptions: newSenderRetryOptions(), + } + + for _, opt := range opts { + err := opt(h) + if err != nil { + return nil, err + } + } + + return h, err +} + +func NewMockHubFromConnectionString(connStr string, opts ...HubOption) (Hub, error) { parsed, err := conn.ParsedConnectionFromStr(connStr) if err != nil { return nil, err @@ -479,7 +536,7 @@ func NewHubFromConnectionString(connStr string, opts ...HubOption) (*Hub, error) return nil, err } - h := &Hub{ + h := &hubImpl{ name: parsed.HubName, namespace: ns, offsetPersister: persist.NewMemoryPersister(), @@ -499,7 +556,7 @@ func NewHubFromConnectionString(connStr string, opts ...HubOption) (*Hub, error) } // GetRuntimeInformation fetches runtime information from the Event Hub management node -func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error) { +func (h *hubImpl) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error) { span, ctx := h.startSpanFromContext(ctx, "eh.Hub.GetRuntimeInformation") defer span.End() client := newClient(h.namespace, h.name) @@ -525,7 +582,7 @@ func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation } // GetPartitionInformation fetches runtime information about a specific partition from the Event Hub management node -func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error) { +func (h *hubImpl) GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error) { span, ctx := h.startSpanFromContext(ctx, "eh.Hub.GetPartitionInformation") defer span.End() client := newClient(h.namespace, h.name) @@ -550,7 +607,7 @@ func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) ( } // Close drains and closes all of the existing senders, receivers and connections -func (h *Hub) Close(ctx context.Context) error { +func (h *hubImpl) Close(ctx context.Context) error { span, ctx := h.startSpanFromContext(ctx, "eh.Hub.Close") defer span.End() @@ -582,7 +639,7 @@ func (h *Hub) Close(ctx context.Context) error { } // closeReceivers will close the receivers on the hub and return the last error -func (h *Hub) closeReceivers(ctx context.Context) error { +func (h *hubImpl) closeReceivers(ctx context.Context) error { span, ctx := h.startSpanFromContext(ctx, "eh.Hub.closeReceivers") defer span.End() @@ -611,7 +668,7 @@ func (h *Hub) closeReceivers(ctx context.Context) error { // ListenerHandle.Done() signals the consumer when the receiver has stopped // // ListenerHandle.Err() provides the last error the listener encountered and was unable to recover from -func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error) { +func (h *hubImpl) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error) { span, ctx := h.startSpanFromContext(ctx, "eh.Hub.Receive") defer span.End() @@ -639,7 +696,7 @@ func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, // Send sends an event to the Event Hub // // Send will retry sending the message for as long as the context allows -func (h *Hub) Send(ctx context.Context, event *Event, opts ...SendOption) error { +func (h *hubImpl) Send(ctx context.Context, event *Event, opts ...SendOption) error { span, ctx := h.startSpanFromContext(ctx, "eh.Hub.Send") defer span.End() @@ -652,7 +709,7 @@ func (h *Hub) Send(ctx context.Context, event *Event, opts ...SendOption) error } // SendBatch sends a batch of events to the Hub -func (h *Hub) SendBatch(ctx context.Context, iterator BatchIterator, opts ...BatchOption) error { +func (h *hubImpl) SendBatch(ctx context.Context, iterator BatchIterator, opts ...BatchOption) error { span, ctx := h.startSpanFromContext(ctx, "eh.Hub.SendBatch") defer span.End() @@ -698,8 +755,8 @@ func (h *Hub) SendBatch(ctx context.Context, iterator BatchIterator, opts ...Bat // HubWithPartitionedSender configures the Hub instance to send to a specific event Hub partition func HubWithPartitionedSender(partitionID string) HubOption { - return func(h *Hub) error { - h.senderPartitionID = &partitionID + return func(h Hub) error { + h.(*hubImpl).senderPartitionID = &partitionID return nil } } @@ -707,8 +764,8 @@ func HubWithPartitionedSender(partitionID string) HubOption { // HubWithOffsetPersistence configures the Hub instance to read and write offsets so that if a Hub is interrupted, it // can resume after the last consumed event. func HubWithOffsetPersistence(offsetPersister persist.CheckpointPersister) HubOption { - return func(h *Hub) error { - h.offsetPersister = offsetPersister + return func(h Hub) error { + h.(*hubImpl).offsetPersister = offsetPersister return nil } } @@ -719,8 +776,8 @@ func HubWithOffsetPersistence(offsetPersister persist.CheckpointPersister) HubOp // // Max user agent length is specified by the const maxUserAgentLen. func HubWithUserAgent(userAgent string) HubOption { - return func(h *Hub) error { - return h.appendAgent(userAgent) + return func(h Hub) error { + return h.(*hubImpl).appendAgent(userAgent) } } @@ -728,16 +785,16 @@ func HubWithUserAgent(userAgent string) HubOption { // // By default, the Hub instance will use Azure US Public cloud environment func HubWithEnvironment(env azure.Environment) HubOption { - return func(h *Hub) error { - h.namespace.host = "amqps://" + h.namespace.name + "." + env.ServiceBusEndpointSuffix + return func(h Hub) error { + h.(*hubImpl).namespace.host = "amqps://" + h.(*hubImpl).namespace.name + "." + env.ServiceBusEndpointSuffix return nil } } // HubWithWebSocketConnection configures the Hub to use a WebSocket connection wss:// rather than amqps:// func HubWithWebSocketConnection() HubOption { - return func(h *Hub) error { - h.namespace.useWebSocket = true + return func(h Hub) error { + h.(*hubImpl).namespace.useWebSocket = true return nil } } @@ -746,13 +803,13 @@ func HubWithWebSocketConnection() HubOption { // in addition to the original attempt. // 0 indicates no retries, and < 0 will cause infinite retries. func HubWithSenderMaxRetryCount(maxRetryCount int) HubOption { - return func(h *Hub) error { - h.senderRetryOptions.maxRetries = maxRetryCount + return func(h Hub) error { + h.(*hubImpl).senderRetryOptions.maxRetries = maxRetryCount return nil } } -func (h *Hub) appendAgent(userAgent string) error { +func (h *hubImpl) appendAgent(userAgent string) error { ua := path.Join(h.userAgent, userAgent) if len(ua) > maxUserAgentLen { return fmt.Errorf("user agent string has surpassed the max length of %d", maxUserAgentLen) @@ -761,7 +818,7 @@ func (h *Hub) appendAgent(userAgent string) error { return nil } -func (h *Hub) getSender(ctx context.Context) (*sender, error) { +func (h *hubImpl) getSender(ctx context.Context) (*sender, error) { h.senderMu.Lock() defer h.senderMu.Unlock() diff --git a/hub_test.go b/hub_test.go index 04b37f03..383758ae 100644 --- a/hub_test.go +++ b/hub_test.go @@ -234,7 +234,7 @@ func (suite *eventHubSuite) randEntityName() string { } func (suite *eventHubSuite) TestSasToken() { - tests := map[string]func(context.Context, *testing.T, *Hub, []string, string){ + tests := map[string]func(context.Context, *testing.T, *hubImpl, []string, string){ "TestMultiSendAndReceive": testMultiSendAndReceive, "TestHubRuntimeInformation": testHubRuntimeInformation, "TestHubPartitionRuntimeInformation": testHubPartitionRuntimeInformation, @@ -258,7 +258,7 @@ func (suite *eventHubSuite) TestSasToken() { } func (suite *eventHubSuite) TestPartitioned() { - tests := map[string]func(context.Context, *testing.T, *Hub, string){ + tests := map[string]func(context.Context, *testing.T, *hubImpl, string){ "TestSend": testBasicSend, "TestSendTooBig": testSendTooBig, "TestSendAndReceive": testBasicSendAndReceive, @@ -283,7 +283,7 @@ func (suite *eventHubSuite) TestPartitioned() { } func (suite *eventHubSuite) TestWebSocket() { - tests := map[string]func(context.Context, *testing.T, *Hub, string){ + tests := map[string]func(context.Context, *testing.T, *hubImpl, string){ "TestSend": testBasicSend, "TestSendTooBig": testSendTooBig, "TestSendAndReceive": testBasicSendAndReceive, @@ -350,12 +350,12 @@ func (suite *eventHubSuite) TestSenderRetryOptionsThroughHub() { } } -func testBasicSend(ctx context.Context, t *testing.T, client *Hub, _ string) { +func testBasicSend(ctx context.Context, t *testing.T, client *hubImpl, _ string) { err := client.Send(ctx, NewEventFromString("Hello!")) assert.NoError(t, err) } -func testSendTooBig(ctx context.Context, t *testing.T, client *Hub, _ string) { +func testSendTooBig(ctx context.Context, t *testing.T, client *hubImpl, _ string) { data := make([]byte, 2600*1024) _, _ = rand.Read(data) event := NewEvent(data) @@ -363,7 +363,7 @@ func testSendTooBig(ctx context.Context, t *testing.T, client *Hub, _ string) { assert.Error(t, err, "encoded message size exceeds max of 1048576") } -func testBatchSendAndReceive(ctx context.Context, t *testing.T, client *Hub, partitionID string) { +func testBatchSendAndReceive(ctx context.Context, t *testing.T, client *hubImpl, partitionID string) { messages := []string{"hello", "world", "foo", "bar", "baz", "buzz"} var wg sync.WaitGroup wg.Add(len(messages)) @@ -389,7 +389,7 @@ func testBatchSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par } } -func testBatchSendTooLarge(ctx context.Context, t *testing.T, client *Hub, _ string) { +func testBatchSendTooLarge(ctx context.Context, t *testing.T, client *hubImpl, _ string) { events := make([]*Event, 200000) var wg sync.WaitGroup @@ -403,7 +403,7 @@ func testBatchSendTooLarge(ctx context.Context, t *testing.T, client *Hub, _ str assert.EqualError(t, client.SendBatch(ctx, ebi, BatchWithMaxSizeInBytes(10000000)), "encoded message size exceeds max of 1048576") } -func testBasicSendAndReceive(ctx context.Context, t *testing.T, client *Hub, partitionID string) { +func testBasicSendAndReceive(ctx context.Context, t *testing.T, client *hubImpl, partitionID string) { numMessages := rand.Intn(100) + 20 var wg sync.WaitGroup wg.Add(numMessages) @@ -449,7 +449,7 @@ func testBasicSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par } func (suite *eventHubSuite) TestEpochReceivers() { - tests := map[string]func(context.Context, *testing.T, *Hub, []string, string){ + tests := map[string]func(context.Context, *testing.T, *hubImpl, []string, string){ "TestEpochGreaterThenLess": testEpochGreaterThenLess, "TestEpochLessThenGreater": testEpochLessThenGreater, } @@ -472,7 +472,7 @@ func (suite *eventHubSuite) TestEpochReceivers() { } } -func testEpochGreaterThenLess(ctx context.Context, t *testing.T, client *Hub, partitionIDs []string, _ string) { +func testEpochGreaterThenLess(ctx context.Context, t *testing.T, client *hubImpl, partitionIDs []string, _ string) { partitionID := partitionIDs[0] r1, err := client.Receive(ctx, partitionID, func(c context.Context, event *Event) error { return nil }, ReceiveWithEpoch(4)) if !assert.NoError(t, err) { @@ -485,7 +485,7 @@ func testEpochGreaterThenLess(ctx context.Context, t *testing.T, client *Hub, pa assert.NoError(t, r1.Err(), "r1 should still be running with the higher epoch") } -func testEpochLessThenGreater(ctx context.Context, t *testing.T, client *Hub, partitionIDs []string, _ string) { +func testEpochLessThenGreater(ctx context.Context, t *testing.T, client *hubImpl, partitionIDs []string, _ string) { partitionID := partitionIDs[0] r1, err := client.Receive(ctx, partitionID, func(c context.Context, event *Event) error { return nil }, ReceiveWithEpoch(1)) if !assert.NoError(t, err) { @@ -509,7 +509,7 @@ func testEpochLessThenGreater(ctx context.Context, t *testing.T, client *Hub, pa } func (suite *eventHubSuite) TestMultiPartition() { - tests := map[string]func(context.Context, *testing.T, *Hub, []string, string){ + tests := map[string]func(context.Context, *testing.T, *hubImpl, []string, string){ "TestMultiSendAndReceive": testMultiSendAndReceive, "TestSendWithPartitionKeyAndReceive": testSendWithPartitionKeyAndReceive, } @@ -528,7 +528,7 @@ func (suite *eventHubSuite) TestMultiPartition() { } } -func testMultiSendAndReceive(ctx context.Context, t *testing.T, client *Hub, partitionIDs []string, _ string) { +func testMultiSendAndReceive(ctx context.Context, t *testing.T, client *hubImpl, partitionIDs []string, _ string) { numMessages := rand.Intn(100) + 20 var wg sync.WaitGroup wg.Add(numMessages) @@ -555,7 +555,7 @@ func testMultiSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par waitUntil(t, &wg, time.Until(end)) } -func testSendWithPartitionKeyAndReceive(ctx context.Context, t *testing.T, client *Hub, partitionIDs []string, _ string) { +func testSendWithPartitionKeyAndReceive(ctx context.Context, t *testing.T, client *hubImpl, partitionIDs []string, _ string) { numMessages := rand.Intn(100) + 20 var wg sync.WaitGroup wg.Add(numMessages) @@ -613,7 +613,7 @@ func testSendWithPartitionKeyAndReceive(ctx context.Context, t *testing.T, clien } func (suite *eventHubSuite) TestHubManagement() { - tests := map[string]func(context.Context, *testing.T, *Hub, []string, string){ + tests := map[string]func(context.Context, *testing.T, *hubImpl, []string, string){ "TestHubRuntimeInformation": testHubRuntimeInformation, "TestHubPartitionRuntimeInformation": testHubPartitionRuntimeInformation, } @@ -633,7 +633,7 @@ func (suite *eventHubSuite) TestHubManagement() { } } -func testHubRuntimeInformation(ctx context.Context, t *testing.T, client *Hub, partitionIDs []string, hubName string) { +func testHubRuntimeInformation(ctx context.Context, t *testing.T, client *hubImpl, partitionIDs []string, hubName string) { info, err := client.GetRuntimeInformation(ctx) if assert.NoError(t, err) { assert.Equal(t, len(partitionIDs), info.PartitionCount) @@ -641,7 +641,7 @@ func testHubRuntimeInformation(ctx context.Context, t *testing.T, client *Hub, p } } -func testHubPartitionRuntimeInformation(ctx context.Context, t *testing.T, client *Hub, partitionIDs []string, hubName string) { +func testHubPartitionRuntimeInformation(ctx context.Context, t *testing.T, client *hubImpl, partitionIDs []string, hubName string) { info, err := client.GetPartitionInformation(ctx, partitionIDs[0]) if assert.NoError(t, err) { assert.Equal(t, hubName, info.HubPath) @@ -657,7 +657,7 @@ func TestEnvironmentalCreation(t *testing.T) { require.NoError(t, os.Unsetenv("EVENTHUB_NAME")) } -func (suite *eventHubSuite) newClient(t *testing.T, hubName string, opts ...HubOption) (*Hub, func()) { +func (suite *eventHubSuite) newClient(t *testing.T, hubName string, opts ...HubOption) (*hubImpl, func()) { provider, err := aad.NewJWTProvider( aad.JWTProviderWithEnvironmentVars(), aad.JWTProviderWithAzureEnvironment(&suite.Env), @@ -668,13 +668,13 @@ func (suite *eventHubSuite) newClient(t *testing.T, hubName string, opts ...HubO return suite.newClientWithProvider(t, hubName, provider, opts...) } -func (suite *eventHubSuite) newClientWithProvider(t *testing.T, hubName string, provider auth.TokenProvider, opts ...HubOption) (*Hub, func()) { +func (suite *eventHubSuite) newClientWithProvider(t *testing.T, hubName string, provider auth.TokenProvider, opts ...HubOption) (*hubImpl, func()) { opts = append(opts, HubWithEnvironment(suite.Env)) client, err := NewHub(suite.Namespace, hubName, provider, opts...) if !suite.NoError(err) { - suite.FailNow("unable to make a new Hub") + suite.FailNow("unable to make a new hubImpl") } - return client, func() { + return client.(*hubImpl), func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _ = client.Close(ctx) @@ -726,7 +726,8 @@ func (suite *eventHubSuite) captureEnv() func() { func TestNewHub_withAzureEnvironmentVariable(t *testing.T) { _ = os.Setenv("AZURE_ENVIRONMENT", "AZURECHINACLOUD") - h, err := NewHub("test", "test", &aad.TokenProvider{}) + hub, err := NewHub("test", "test", &aad.TokenProvider{}) + h := hub.(*hubImpl) if err != nil { t.Fatal(err) } @@ -734,7 +735,8 @@ func TestNewHub_withAzureEnvironmentVariable(t *testing.T) { t.Fatalf("did not set appropriate endpoint suffix. Expected: %v, Received: %v", azure.ChinaCloud.ServiceBusEndpointSuffix, h.namespace.host) } _ = os.Setenv("AZURE_ENVIRONMENT", "AZUREGERMANCLOUD") - h, err = NewHub("test", "test", &aad.TokenProvider{}) + hub, err = NewHub("test", "test", &aad.TokenProvider{}) + h = hub.(*hubImpl) if err != nil { t.Fatal(err) } @@ -742,7 +744,8 @@ func TestNewHub_withAzureEnvironmentVariable(t *testing.T) { t.Fatalf("did not set appropriate endpoint suffix. Expected: %v, Received: %v", azure.GermanCloud.ServiceBusEndpointSuffix, h.namespace.host) } _ = os.Setenv("AZURE_ENVIRONMENT", "AZUREUSGOVERNMENTCLOUD") - h, err = NewHub("test", "test", &aad.TokenProvider{}) + hub, err = NewHub("test", "test", &aad.TokenProvider{}) + h = hub.(*hubImpl) if err != nil { t.Fatal(err) } @@ -750,7 +753,8 @@ func TestNewHub_withAzureEnvironmentVariable(t *testing.T) { t.Fatalf("did not set appropriate endpoint suffix. Expected: %v, Received: %v", azure.USGovernmentCloud.ServiceBusEndpointSuffix, h.namespace.host) } _ = os.Setenv("AZURE_ENVIRONMENT", "AZUREPUBLICCLOUD") - h, err = NewHub("test", "test", &aad.TokenProvider{}) + hub, err = NewHub("test", "test", &aad.TokenProvider{}) + h = hub.(*hubImpl) if err != nil { t.Fatal(err) } @@ -758,7 +762,8 @@ func TestNewHub_withAzureEnvironmentVariable(t *testing.T) { t.Fatalf("did not set appropriate endpoint suffix. Expected: %v, Received: %v", azure.PublicCloud.ServiceBusEndpointSuffix, h.namespace.host) } _ = os.Unsetenv("AZURE_ENVIRONMENT") - h, err = NewHub("test", "test", &aad.TokenProvider{}) + hub, err = NewHub("test", "test", &aad.TokenProvider{}) + h = hub.(*hubImpl) if err != nil { t.Fatal(err) } diff --git a/internal/stress/throttling/throttling.go b/internal/stress/throttling/throttling.go index 01b79de0..f9b5cf7a 100644 --- a/internal/stress/throttling/throttling.go +++ b/internal/stress/throttling/throttling.go @@ -51,7 +51,7 @@ func main() { verifyMessages(context.TODO(), hub, partitions, messageCount) } -func sendMessages(hub *eventhub.Hub) int64 { +func sendMessages(hub eventhub.Hub) int64 { var batches []eventhub.BatchIterator nextTestId := int64(0) @@ -112,7 +112,7 @@ func createEventBatch(testId *int64) eventhub.BatchIterator { return eventhub.NewEventBatchIterator(events...) } -func getPartitionCounts(ctx context.Context, hub *eventhub.Hub) map[string]*eventhub.HubPartitionRuntimeInformation { +func getPartitionCounts(ctx context.Context, hub eventhub.Hub) map[string]*eventhub.HubPartitionRuntimeInformation { partitions := map[string]*eventhub.HubPartitionRuntimeInformation{} runtimeInfo, err := hub.GetRuntimeInformation(ctx) @@ -134,7 +134,7 @@ func getPartitionCounts(ctx context.Context, hub *eventhub.Hub) map[string]*even return partitions } -func verifyMessages(ctx context.Context, hub *eventhub.Hub, partitions map[string]*eventhub.HubPartitionRuntimeInformation, expectedMessages int64) { +func verifyMessages(ctx context.Context, hub eventhub.Hub, partitions map[string]*eventhub.HubPartitionRuntimeInformation, expectedMessages int64) { after := time.After(time.Minute * 5) receiverCtx, cancel := context.WithCancel(ctx) diff --git a/races_test.go b/races_test.go index 7dea6014..a7007ba1 100644 --- a/races_test.go +++ b/races_test.go @@ -12,7 +12,7 @@ import ( ) func (suite *eventHubSuite) TestConcurrency() { - tests := map[string]func(context.Context, *testing.T, *Hub, string){ + tests := map[string]func(context.Context, *testing.T, *hubImpl, string){ "TestConcurrentSendWithRecover": testConcurrentSendWithRecover, } @@ -32,7 +32,7 @@ func (suite *eventHubSuite) TestConcurrency() { } } -func testConcurrentSendWithRecover(ctx context.Context, t *testing.T, client *Hub, _ string) { +func testConcurrentSendWithRecover(ctx context.Context, t *testing.T, client *hubImpl, _ string) { var wg sync.WaitGroup var err error for i := 0; i < 100; i++ { diff --git a/receiver.go b/receiver.go index 4469da2c..2e176ffc 100644 --- a/receiver.go +++ b/receiver.go @@ -51,7 +51,7 @@ const ( // receiver provides session and link handling for a receiving entity path type ( receiver struct { - hub *Hub + hub *hubImpl connection *amqp.Client session *session receiver *amqp.Receiver @@ -132,8 +132,8 @@ func ReceiveWithEpoch(epoch int64) ReceiveOption { } // newReceiver creates a new Service Bus message listener given an AMQP client and an entity path -func (h *Hub) newReceiver(ctx context.Context, partitionID string, opts ...ReceiveOption) (*receiver, error) { - span, ctx := h.startSpanFromContext(ctx, "eh.Hub.newReceiver") +func (h *hubImpl) newReceiver(ctx context.Context, partitionID string, opts ...ReceiveOption) (*receiver, error) { + span, ctx := h.startSpanFromContext(ctx, "eh.hubImpl.newReceiver") defer span.End() receiver := &receiver{ diff --git a/sender.go b/sender.go index c3903abb..75d61441 100644 --- a/sender.go +++ b/sender.go @@ -45,7 +45,7 @@ const ( // sender provides session and link handling for an sending entity path type ( sender struct { - hub *Hub + hub *hubImpl connection *amqp.Client session *session sender atomic.Value // holds a *amqp.Sender @@ -100,7 +100,7 @@ func newSenderRetryOptions() *senderRetryOptions { } // newSender creates a new Service Bus message sender given an AMQP client and entity path -func (h *Hub) newSender(ctx context.Context, retryOptions *senderRetryOptions) (*sender, error) { +func (h *hubImpl) newSender(ctx context.Context, retryOptions *senderRetryOptions) (*sender, error) { span, ctx := h.startSpanFromContext(ctx, "eh.sender.newSender") defer span.End() diff --git a/sender_test.go b/sender_test.go index 64ef9a02..cbbb3b3e 100644 --- a/sender_test.go +++ b/sender_test.go @@ -379,7 +379,7 @@ func (s *fakeSender) Close(ctx context.Context) error { func createRecoveryBlock1Sender(t *testing.T, afterBlock1 func(s *sender)) (func(), *sender) { s := &sender{ partitionID: to.StringPtr("0"), - hub: &Hub{ + hub: &hubImpl{ namespace: &namespace{}, }, } diff --git a/storage/eph_test.go b/storage/eph_test.go index 1235a2d7..a12ef515 100644 --- a/storage/eph_test.go +++ b/storage/eph_test.go @@ -255,13 +255,13 @@ func (ts *testSuite) newStorageBackedEPHOptions(hubName string, leaser eph.Lease return processor, nil } -func (ts *testSuite) newClient(t *testing.T, hubName string, opts ...eventhub.HubOption) *eventhub.Hub { +func (ts *testSuite) newClient(t *testing.T, hubName string, opts ...eventhub.HubOption) eventhub.Hub { provider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars(), aad.JWTProviderWithAzureEnvironment(&ts.Env)) ts.Require().NoError(err) return ts.newClientWithProvider(t, hubName, provider, opts...) } -func (ts *testSuite) newClientWithProvider(t *testing.T, hubName string, provider auth.TokenProvider, opts ...eventhub.HubOption) *eventhub.Hub { +func (ts *testSuite) newClientWithProvider(t *testing.T, hubName string, provider auth.TokenProvider, opts ...eventhub.HubOption) eventhub.Hub { opts = append(opts, eventhub.HubWithEnvironment(ts.Env)) client, err := eventhub.NewHub(ts.Namespace, hubName, provider, opts...) ts.Require().NoError(err) diff --git a/tracing.go b/tracing.go index 700e6f68..6573ae8e 100644 --- a/tracing.go +++ b/tracing.go @@ -9,7 +9,7 @@ import ( "github.com/devigned/tab" ) -func (h *Hub) startSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) { +func (h *hubImpl) startSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) { ctx, span := tab.StartSpan(ctx, operationName) ApplyComponentInfo(span) return span, ctx