diff --git a/eph/eph.go b/eph/eph.go index dfcac9f6..612484df 100644 --- a/eph/eph.go +++ b/eph/eph.go @@ -299,6 +299,7 @@ func (h *EventProcessorHost) UnregisterHandler(ctx context.Context, id HandlerID } // Start begins processing of messages for registered handlers on the EventHostProcessor. The call is blocking. +// You MUST call Close() when the event processor host is no longer required. func (h *EventProcessorHost) Start(ctx context.Context) error { span, ctx := startConsumerSpanFromContext(ctx, "eph.EventProcessorHost.Start") defer span.End() @@ -329,7 +330,8 @@ func (h *EventProcessorHost) Start(ctx context.Context) error { return h.Close(ctx) } -// StartNonBlocking begins processing of messages for registered handlers +// StartNonBlocking begins processing of messages for registered handlers. +// You MUST call Close() when the event processor host is no longer required. func (h *EventProcessorHost) StartNonBlocking(ctx context.Context) error { span, ctx := startConsumerSpanFromContext(ctx, "eph.EventProcessorHost.StartNonBlocking") defer span.End() diff --git a/eph/scheduler.go b/eph/scheduler.go index 695b9341..ffe1b051 100644 --- a/eph/scheduler.go +++ b/eph/scheduler.go @@ -56,6 +56,7 @@ type ( done func() leaseRenewalInterval time.Duration receiverMu sync.Mutex + close chan struct{} } ownerCount struct { @@ -69,6 +70,7 @@ func newScheduler(eventHostProcessor *EventProcessorHost) *scheduler { processor: eventHostProcessor, receivers: make(map[string]*leasedReceiver), leaseRenewalInterval: DefaultLeaseRenewalInterval, + close: make(chan struct{}), } } @@ -83,6 +85,9 @@ func (s *scheduler) Run(ctx context.Context) { case <-ctx.Done(): s.dlog(ctx, "shutting down scan") return + case <-s.close: + s.dlog(ctx, "shutting down scan") + return default: s.scan(ctx) skew := time.Duration(rand.Intn(1000)-500) * time.Millisecond @@ -193,6 +198,7 @@ func (s *scheduler) Stop(ctx context.Context) error { _, _ = s.processor.leaser.ReleaseLease(ctx, lr.lease.GetPartitionID()) } + close(s.close) return lastErr }