From 8e87ca058b7d4acf084ecf48cce299029ca0cee3 Mon Sep 17 00:00:00 2001 From: "Alessandro (Ale) Segala" <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 3 Jun 2022 12:56:45 -0700 Subject: [PATCH 1/2] Fix goroutine leak on closing --- eph/eph.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eph/eph.go b/eph/eph.go index fda6965e..cbaf0287 100644 --- a/eph/eph.go +++ b/eph/eph.go @@ -37,7 +37,7 @@ import ( "github.com/Azure/azure-amqp-common-go/v3/conn" "github.com/Azure/azure-amqp-common-go/v3/sas" "github.com/Azure/azure-amqp-common-go/v3/uuid" - "github.com/Azure/azure-event-hubs-go/v3" + eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/Azure/azure-event-hubs-go/v3/persist" "github.com/Azure/go-autorest/autorest/azure" @@ -318,7 +318,7 @@ func (h *EventProcessorHost) Start(ctx context.Context) error { go func() { span := tab.FromContext(ctx) - ctx := tab.NewContext(context.Background(), span) + ctx := tab.NewContext(ctx, span) h.scheduler.Run(ctx) }() @@ -344,7 +344,7 @@ func (h *EventProcessorHost) StartNonBlocking(ctx context.Context) error { go func() { span := tab.FromContext(ctx) - ctx := tab.NewContext(context.Background(), span) + ctx := tab.NewContext(ctx, span) h.scheduler.Run(ctx) }() From 02450e9d324520b6f208727d02c3e58b89023ccf Mon Sep 17 00:00:00 2001 From: Joel Hendrix Date: Tue, 16 Aug 2022 15:50:39 -0700 Subject: [PATCH 2/2] doc that calling Close() is required avoid goroutine leak when Close() is called --- eph/eph.go | 8 +++++--- eph/scheduler.go | 6 ++++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/eph/eph.go b/eph/eph.go index 0ac98b7d..612484df 100644 --- a/eph/eph.go +++ b/eph/eph.go @@ -299,6 +299,7 @@ func (h *EventProcessorHost) UnregisterHandler(ctx context.Context, id HandlerID } // Start begins processing of messages for registered handlers on the EventHostProcessor. The call is blocking. +// You MUST call Close() when the event processor host is no longer required. func (h *EventProcessorHost) Start(ctx context.Context) error { span, ctx := startConsumerSpanFromContext(ctx, "eph.EventProcessorHost.Start") defer span.End() @@ -318,7 +319,7 @@ func (h *EventProcessorHost) Start(ctx context.Context) error { go func() { span := tab.FromContext(ctx) - ctx := tab.NewContext(ctx, span) + ctx := tab.NewContext(context.Background(), span) h.scheduler.Run(ctx) }() @@ -329,7 +330,8 @@ func (h *EventProcessorHost) Start(ctx context.Context) error { return h.Close(ctx) } -// StartNonBlocking begins processing of messages for registered handlers +// StartNonBlocking begins processing of messages for registered handlers. +// You MUST call Close() when the event processor host is no longer required. func (h *EventProcessorHost) StartNonBlocking(ctx context.Context) error { span, ctx := startConsumerSpanFromContext(ctx, "eph.EventProcessorHost.StartNonBlocking") defer span.End() @@ -344,7 +346,7 @@ func (h *EventProcessorHost) StartNonBlocking(ctx context.Context) error { go func() { span := tab.FromContext(ctx) - ctx := tab.NewContext(ctx, span) + ctx := tab.NewContext(context.Background(), span) h.scheduler.Run(ctx) }() diff --git a/eph/scheduler.go b/eph/scheduler.go index 695b9341..ffe1b051 100644 --- a/eph/scheduler.go +++ b/eph/scheduler.go @@ -56,6 +56,7 @@ type ( done func() leaseRenewalInterval time.Duration receiverMu sync.Mutex + close chan struct{} } ownerCount struct { @@ -69,6 +70,7 @@ func newScheduler(eventHostProcessor *EventProcessorHost) *scheduler { processor: eventHostProcessor, receivers: make(map[string]*leasedReceiver), leaseRenewalInterval: DefaultLeaseRenewalInterval, + close: make(chan struct{}), } } @@ -83,6 +85,9 @@ func (s *scheduler) Run(ctx context.Context) { case <-ctx.Done(): s.dlog(ctx, "shutting down scan") return + case <-s.close: + s.dlog(ctx, "shutting down scan") + return default: s.scan(ctx) skew := time.Duration(rand.Intn(1000)-500) * time.Millisecond @@ -193,6 +198,7 @@ func (s *scheduler) Stop(ctx context.Context) error { _, _ = s.processor.leaser.ReleaseLease(ctx, lr.lease.GetPartitionID()) } + close(s.close) return lastErr }