From f1800c2095fce9980440640469c94fd0f82f5ec2 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Wed, 1 Nov 2023 10:09:38 -0400 Subject: [PATCH] `KafkaConsumer`: back pressure + improved read speed (#139) * Add Back Pressure to `KafkaConsumer` Motivation: Closes #131. Modifications: * re-add `KafkaConsumerConfiguration.backPressureStrategy: BackPressureStrategy`, currently allowing users to add high-low-watermark backpressure to their `KafkaConsumer`s * `KafkaConsumer`: * make `KafkaConsumerMessages` use `NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark` as backpressure strategy * remove `rd_kafka_poll_set_consumer` -> use two separate queues for consumer events and consumer messages so we can exert backpressure on the consumer message queue * remove idle polling mechanism where incoming messages were discarded when `KafkaConsumerMessages` was terminated -> we now have to independent queues * rename `.pollForAndYieldMessage` -> `.pollForEventsAndMessages` * refactor `State` and add `ConsumerMessagesSequenceState` * `KafkaProducer`: * rename `.consumptionStopped` -> `.eventConsumptionFinished` * `RDKafkaClient`: * bring back `consumerPoll()` * `eventPoll()`: only queue main queue for events since consumer messages are now handled on a different queue * KafkaConsumer: two state machines Modifications: * have two state machines: 1. consumer state itself 2. state of consumer messages async sequence * KafkaConsumer: merge both state machines * Refactor + DocC * Review Franz + Blindspot Modifications: * `KafkaConsumer`: * end consumer message poll loop when async sequence drops message * do not sleep if we picked up reading new messages again after we finished reading a partition * `messageRunLoop`: * fix `fatalError` where `newMessagesProduced()` is invoked after `stopProducing()` * add func `batchConsumerPoll` that reads a batch of messages to avoid acquiring the lock in `messageRunLoop` too often --- .../KafkaConsumerConfiguration.swift | 33 +- Sources/Kafka/KafkaConsumer.swift | 436 +++++++++++++----- Sources/Kafka/KafkaConsumerEvent.swift | 2 - Sources/Kafka/KafkaProducer.swift | 12 +- Sources/Kafka/KafkaProducerEvent.swift | 2 - Sources/Kafka/RDKafka/RDKafkaClient.swift | 72 +-- 6 files changed, 368 insertions(+), 189 deletions(-) diff --git a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift index a8d7d26b..60215250 100644 --- a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift @@ -23,6 +23,37 @@ public struct KafkaConsumerConfiguration { /// Default: `.milliseconds(100)` public var pollInterval: Duration = .milliseconds(100) + /// A struct representing different back pressure strategies for consuming messages in ``KafkaConsumer``. + public struct BackPressureStrategy: Sendable, Hashable { + enum _BackPressureStrategy: Sendable, Hashable { + case watermark(low: Int, high: Int) + } + + let _internal: _BackPressureStrategy + + private init(backPressureStrategy: _BackPressureStrategy) { + self._internal = backPressureStrategy + } + + /// A back pressure strategy based on high and low watermarks. + /// + /// The consumer maintains a buffer size between a low watermark and a high watermark + /// to control the flow of incoming messages. + /// + /// - Parameter low: The lower threshold for the buffer size (low watermark). + /// - Parameter high: The upper threshold for the buffer size (high watermark). + public static func watermark(low: Int, high: Int) -> BackPressureStrategy { + return .init(backPressureStrategy: .watermark(low: low, high: high)) + } + } + + /// The backpressure strategy to be used for message consumption. + /// See ``KafkaConsumerConfiguration/BackPressureStrategy-swift.struct`` for more information. + public var backPressureStrategy: BackPressureStrategy = .watermark( + low: 10, + high: 50 + ) + /// A struct representing the different Kafka message consumption strategies. public struct ConsumptionStrategy: Sendable, Hashable { enum _ConsumptionStrategy: Sendable, Hashable { @@ -63,7 +94,7 @@ public struct KafkaConsumerConfiguration { } /// The strategy used for consuming messages. - /// See ``KafkaConfiguration/ConsumptionStrategy`` for more information. + /// See ``KafkaConsumerConfiguration/ConsumptionStrategy-swift.struct-swift.struct`` for more information. public var consumptionStrategy: ConsumptionStrategy // MARK: - Consumer-specific Config Properties diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index a2dfa239..1163f862 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -17,21 +17,37 @@ import NIOConcurrencyHelpers import NIOCore import ServiceLifecycle -// MARK: - KafkaConsumerCloseOnTerminate +// MARK: - KafkaConsumerEventsDelegate -/// `NIOAsyncSequenceProducerDelegate` that terminates the closes the producer when -/// `didTerminate()` is invoked. -internal struct KafkaConsumerCloseOnTerminate: Sendable { +/// `NIOAsyncSequenceProducerDelegate` for ``KafkaConsumerEvents``. +internal struct KafkaConsumerEventsDelegate: Sendable { let stateMachine: NIOLockedValueBox } -extension KafkaConsumerCloseOnTerminate: NIOAsyncSequenceProducerDelegate { +extension KafkaConsumerEventsDelegate: NIOAsyncSequenceProducerDelegate { func produceMore() { return // No back pressure } func didTerminate() { - self.stateMachine.withLockedValue { $0.messageSequenceTerminated() } + return // We have to call poll for events anyway, nothing to do here + } +} + +// MARK: - KafkaConsumerMessagesDelegate + +/// `NIOAsyncSequenceProducerDelegate` for ``KafkaConsumerMessages``. +internal struct KafkaConsumerMessagesDelegate: Sendable { + let stateMachine: NIOLockedValueBox +} + +extension KafkaConsumerMessagesDelegate: NIOAsyncSequenceProducerDelegate { + func produceMore() { + self.stateMachine.withLockedValue { $0.produceMore() } + } + + func didTerminate() { + self.stateMachine.withLockedValue { $0.finishMessageConsumption() } } } @@ -41,7 +57,7 @@ extension KafkaConsumerCloseOnTerminate: NIOAsyncSequenceProducerDelegate { public struct KafkaConsumerEvents: Sendable, AsyncSequence { public typealias Element = KafkaConsumerEvent typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure - typealias WrappedSequence = NIOAsyncSequenceProducer + typealias WrappedSequence = NIOAsyncSequenceProducer let wrappedSequence: WrappedSequence /// `AsynceIteratorProtocol` implementation for handling ``KafkaConsumerEvent``s emitted by Kafka. @@ -65,12 +81,12 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { let stateMachine: NIOLockedValueBox public typealias Element = KafkaConsumerMessage - typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure + typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark typealias WrappedSequence = NIOThrowingAsyncSequenceProducer< Result, Error, BackPressureStrategy, - KafkaConsumerCloseOnTerminate + KafkaConsumerMessagesDelegate > let wrappedSequence: WrappedSequence @@ -127,8 +143,8 @@ public final class KafkaConsumer: Sendable, Service { typealias Producer = NIOThrowingAsyncSequenceProducer< Result, Error, - NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure, - KafkaConsumerCloseOnTerminate + NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, + KafkaConsumerMessagesDelegate > /// The configuration object of the consumer client. private let configuration: KafkaConsumerConfiguration @@ -142,7 +158,7 @@ public final class KafkaConsumer: Sendable, Service { // Private initializer, use factory method or convenience init to create KafkaConsumer /// Initialize a new ``KafkaConsumer``. - /// To listen to incoming messages, please subscribe to a list of topics using ``subscribe(topics:)`` + /// To listen to incoming messages, please subscribe to a list of topics using ``subscribe()`` /// or assign the consumer to a particular topic + partition pair using ``assign(topic:partition:offset:)``. /// /// - Parameters: @@ -163,8 +179,16 @@ public final class KafkaConsumer: Sendable, Service { let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence( elementType: Result.self, - backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(), - delegate: KafkaConsumerCloseOnTerminate(stateMachine: self.stateMachine) + backPressureStrategy: { + switch configuration.backPressureStrategy._internal { + case .watermark(let lowWatermark, let highWatermark): + return NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark( + lowWatermark: lowWatermark, + highWatermark: highWatermark + ) + } + }(), + delegate: KafkaConsumerMessagesDelegate(stateMachine: self.stateMachine) ) self.messages = KafkaConsumerMessages( @@ -178,9 +202,6 @@ public final class KafkaConsumer: Sendable, Service { source: sourceAndSequence.source ) } - - // Forward main queue events to the consumer queue. - try client.pollSetConsumer() } /// Initialize a new ``KafkaConsumer``. @@ -196,9 +217,7 @@ public final class KafkaConsumer: Sendable, Service { configuration: KafkaConsumerConfiguration, logger: Logger ) throws { - let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) - - var subscribedEvents: [RDKafkaEvent] = [.log, .fetch] + var subscribedEvents: [RDKafkaEvent] = [.log] // Only listen to offset commit events when autoCommit is false if configuration.isAutoCommitEnabled == false { subscribedEvents.append(.offsetCommit) @@ -211,6 +230,8 @@ public final class KafkaConsumer: Sendable, Service { logger: logger ) + let stateMachine = NIOLockedValueBox(StateMachine()) + try self.init( client: client, stateMachine: stateMachine, @@ -236,9 +257,7 @@ public final class KafkaConsumer: Sendable, Service { configuration: KafkaConsumerConfiguration, logger: Logger ) throws -> (KafkaConsumer, KafkaConsumerEvents) { - let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) - - var subscribedEvents: [RDKafkaEvent] = [.log, .fetch] + var subscribedEvents: [RDKafkaEvent] = [.log] // Only listen to offset commit events when autoCommit is false if configuration.isAutoCommitEnabled == false { subscribedEvents.append(.offsetCommit) @@ -251,6 +270,8 @@ public final class KafkaConsumer: Sendable, Service { logger: logger ) + let stateMachine = NIOLockedValueBox(StateMachine()) + let consumer = try KafkaConsumer( client: client, stateMachine: stateMachine, @@ -266,7 +287,7 @@ public final class KafkaConsumer: Sendable, Service { let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( elementType: KafkaConsumerEvent.self, backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(), - delegate: KafkaConsumerCloseOnTerminate(stateMachine: stateMachine) + delegate: KafkaConsumerEventsDelegate(stateMachine: stateMachine) ) let eventsSequence = KafkaConsumerEvents(wrappedSequence: sourceAndSequence.sequence) @@ -331,26 +352,75 @@ public final class KafkaConsumer: Sendable, Service { try self.subscribe(topics: topics) } + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await self.eventRunLoop() + } + + group.addTask { + try await self.messageRunLoop() + } + + // Throw when one of the two child task throws + try await group.next() + try await group.next() + } + } + + /// Run loop polling Kafka for new events. + private func eventRunLoop() async throws { while !Task.isCancelled { - let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() } + let nextAction = self.stateMachine.withLockedValue { $0.nextEventPollLoopAction() } switch nextAction { - case .pollForAndYieldMessage(let client, let source): - let events = client.eventPoll() - for event in events { - switch event { - case .consumerMessages(let result): - // We do not support back pressure, we can ignore the yield result - _ = source.yield(result) - default: - break // Ignore + case .pollForEvents(let client): + // Event poll to serve any events queued inside of `librdkafka`. + _ = client.eventPoll() + try await Task.sleep(for: self.configuration.pollInterval) + case .terminatePollLoop: + return + } + } + } + + /// Run loop polling Kafka for new consumer messages. + private func messageRunLoop() async throws { + while !Task.isCancelled { + let nextAction = self.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() } + switch nextAction { + case .pollForAndYieldMessages(let client, let source): + // Poll for new consumer messages. + let messageResults = self.batchConsumerPoll(client: client) + if messageResults.isEmpty { + self.stateMachine.withLockedValue { $0.waitForNewMessages() } + } else { + let yieldResult = source.yield(contentsOf: messageResults) + switch yieldResult { + case .produceMore: + break + case .stopProducing: + self.stateMachine.withLockedValue { $0.stopProducing() } + case .dropped: + return } } - try await Task.sleep(for: self.configuration.pollInterval) - case .pollWithoutYield(let client): - // Ignore poll result. - // We are just polling to serve any remaining events queued inside of `librdkafka`. - // All remaining queued consumer messages will get dropped and not be committed (marked as read). - _ = client.eventPoll() + case .pollForMessagesIfAvailable(let client, let source): + let messageResults = self.batchConsumerPoll(client: client) + if messageResults.isEmpty { + // Still no new messages, so sleep. + try await Task.sleep(for: self.configuration.pollInterval) + } else { + // New messages were produced to the partition that we previously finished reading. + let yieldResult = source.yield(contentsOf: messageResults) + switch yieldResult { + case .produceMore: + break + case .stopProducing: + self.stateMachine.withLockedValue { $0.stopProducing() } + case .dropped: + return + } + } + case .suspendPollLoop: try await Task.sleep(for: self.configuration.pollInterval) case .terminatePollLoop: return @@ -358,6 +428,36 @@ public final class KafkaConsumer: Sendable, Service { } } + /// Read `maxMessages` consumer messages from Kafka. + /// + /// - Parameters: + /// - client: Client used for handling the connection to the Kafka cluster. + /// - maxMessages: Maximum amount of consumer messages to read in this invocation. + private func batchConsumerPoll( + client: RDKafkaClient, + maxMessages: Int = 100 + ) -> [Result] { + var messageResults = [Result]() + messageResults.reserveCapacity(maxMessages) + + for _ in 0..? + do { + if let message = try client.consumerPoll() { + result = .success(message) + } + } catch { + result = .failure(error) + } + + if let result { + messageResults.append(result) + } + } + + return messageResults + } + /// Mark all messages up to the passed message in the topic as read. /// Schedules a commit and returns immediately. /// Any errors encountered after scheduling the commit will be discarded. @@ -424,12 +524,6 @@ public final class KafkaConsumer: Sendable, Service { client: client, logger: self.logger ) - case .triggerGracefulShutdownAndFinishSource(let client, let source): - source.finish() - self._triggerGracefulShutdown( - client: client, - logger: self.logger - ) case .none: return } @@ -456,8 +550,24 @@ public final class KafkaConsumer: Sendable, Service { extension KafkaConsumer { /// State machine representing the state of the ``KafkaConsumer``. struct StateMachine: Sendable { - /// A logger. - let logger: Logger + /// State of the event loop fetching new consumer messages. + enum MessagePollLoopState { + /// The sequence can take more messages. + /// + /// - Parameter source: The source for yielding new messages. + case running(source: Producer.Source) + /// Sequence suspended due to back pressure. + /// + /// - Parameter source: The source for yielding new messages. + case suspended(source: Producer.Source) + /// We have read to the end of a partition and are now waiting for new messages + /// to be produced. + /// + /// - Parameter source: The source for yielding new messages. + case waitingForMessages(source: Producer.Source) + /// The sequence has finished, and no more messages will be produced. + case finished + } /// The state of the ``StateMachine``. enum State: Sendable { @@ -468,7 +578,7 @@ extension KafkaConsumer { /// though ``subscribe()`` / ``assign()`` have not been invoked. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + /// - Parameter source: The source for yielding new messages. case initializing( client: RDKafkaClient, source: Producer.Source @@ -476,16 +586,8 @@ extension KafkaConsumer { /// The ``KafkaConsumer`` is consuming messages. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case consuming( - client: RDKafkaClient, - source: Producer.Source - ) - /// Consumer is still running but the messages asynchronous sequence was terminated. - /// All incoming messages will be dropped. - /// - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case consumptionStopped(client: RDKafkaClient) + /// - Parameter state: State of the event loop fetching new consumer messages. + case running(client: RDKafkaClient, messagePollLoopState: MessagePollLoopState) /// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked. /// We are now in the process of commiting our last state to the broker. /// @@ -514,21 +616,60 @@ extension KafkaConsumer { } /// Action to be taken when wanting to poll for a new message. - enum PollLoopAction { + enum EventPollLoopAction { + /// Serve any queued callbacks on the event queue. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + case pollForEvents(client: RDKafkaClient) + /// Terminate the poll loop. + case terminatePollLoop + } + + /// Returns the next action to be taken when wanting to poll. + /// - Returns: The next action to be taken when wanting to poll, or `nil` if there is no action to be taken. + /// + /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. + mutating func nextEventPollLoopAction() -> EventPollLoopAction { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .initializing: + fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") + case .running(let client, _): + return .pollForEvents(client: client) + case .finishing(let client): + if client.isConsumerClosed { + self.state = .finished + return .terminatePollLoop + } else { + return .pollForEvents(client: client) + } + case .finished: + return .terminatePollLoop + } + } + + /// Action to be taken when wanting to poll for a new message. + enum ConsumerPollLoopAction { /// Poll for a new ``KafkaConsumerMessage``. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case pollForAndYieldMessage( + case pollForAndYieldMessages( client: RDKafkaClient, source: Producer.Source ) - /// The ``KafkaConsumer`` stopped consuming messages or - /// is in the process of shutting down. - /// Poll to serve any queued events and commit outstanding state to the broker. + /// Poll for a new ``KafkaConsumerMessage`` or sleep for ``KafkaConsumerConfiguration/pollInterval`` + /// if there are no new messages to read from the partition. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case pollWithoutYield(client: RDKafkaClient) + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + case pollForMessagesIfAvailable( + client: RDKafkaClient, + source: Producer.Source + ) + /// Sleep for ``KafkaConsumerConfiguration/pollInterval``. + case suspendPollLoop /// Terminate the poll loop. case terminatePollLoop } @@ -537,24 +678,24 @@ extension KafkaConsumer { /// - Returns: The next action to be taken when wanting to poll, or `nil` if there is no action to be taken. /// /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. - mutating func nextPollLoopAction() -> PollLoopAction { + mutating func nextConsumerPollLoopAction() -> ConsumerPollLoopAction { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .consuming(let client, let source): - return .pollForAndYieldMessage(client: client, source: source) - case .consumptionStopped(let client): - return .pollWithoutYield(client: client) - case .finishing(let client): - if client.isConsumerClosed { - self.state = .finished + case .running(let client, let consumerState): + switch consumerState { + case .running(let source): + return .pollForAndYieldMessages(client: client, source: source) + case .suspended(source: _): + return .suspendPollLoop + case .waitingForMessages(let source): + return .pollForMessagesIfAvailable(client: client, source: source) + case .finished: return .terminatePollLoop - } else { - return .pollWithoutYield(client: client) } - case .finished: + case .finishing, .finished: return .terminatePollLoop } } @@ -574,30 +715,12 @@ extension KafkaConsumer { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .initializing(let client, let source): - self.state = .consuming( - client: client, - source: source - ) + self.state = .running(client: client, messagePollLoopState: .running(source: source)) return .setUpConnection(client: client) - case .consuming, .consumptionStopped, .finishing, .finished: - fatalError("\(#function) should only be invoked upon initialization of KafkaConsumer") - } - } - - /// The messages asynchronous sequence was terminated. - /// All incoming messages will be dropped. - mutating func messageSequenceTerminated() { - switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing: - fatalError("Call to \(#function) before setUpConnection() was invoked") - case .consumptionStopped: - fatalError("messageSequenceTerminated() must not be invoked more than once") - case .consuming(let client, _): - self.state = .consumptionStopped(client: client) + case .running: + fatalError("\(#function) should not be invoked more than once") case .finishing, .finished: - break + fatalError("\(#function) should only be invoked when KafkaConsumer is running") } } @@ -617,10 +740,8 @@ extension KafkaConsumer { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: - fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets") - case .consumptionStopped: - fatalError("Cannot store offset when consumption has been stopped") - case .consuming(let client, _): + fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") + case .running(let client, _): return .storeOffset(client: client) case .finishing, .finished: return .terminateConsumerSequence @@ -632,9 +753,7 @@ extension KafkaConsumer { /// Do a commit. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case commit( - client: RDKafkaClient - ) + case commit(client: RDKafkaClient) /// Throw an error. The ``KafkaConsumer`` is closed. case throwClosedError } @@ -648,10 +767,8 @@ extension KafkaConsumer { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: - fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets") - case .consumptionStopped: - fatalError("Cannot commit when consumption has been stopped") - case .consuming(let client, _): + fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") + case .running(let client, _): return .commit(client: client) case .finishing, .finished: return .throwClosedError @@ -663,17 +780,7 @@ extension KafkaConsumer { /// Shut down the ``KafkaConsumer``. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case triggerGracefulShutdown( - client: RDKafkaClient - ) - /// Shut down the ``KafkaConsumer`` and finish the given `source` object. - /// - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case triggerGracefulShutdownAndFinishSource( - client: RDKafkaClient, - source: Producer.Source - ) + case triggerGracefulShutdown(client: RDKafkaClient) } /// Get action to be taken when wanting to do close the consumer. @@ -685,19 +792,92 @@ extension KafkaConsumer { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: - fatalError("subscribe() / assign() should have been invoked before \(#function)") - case .consuming(let client, let source): - self.state = .finishing(client: client) - return .triggerGracefulShutdownAndFinishSource( - client: client, - source: source - ) - case .consumptionStopped(let client): + fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") + case .running(let client, _): self.state = .finishing(client: client) return .triggerGracefulShutdown(client: client) case .finishing, .finished: return nil } } + + // MARK: - Consumer Messages Poll Loop Actions + + /// The partition that was previously finished reading has got new messages produced to it. + mutating func newMessagesProduced() { + guard case .running(let client, let consumerState) = self.state else { + fatalError("\(#function) invoked while still in state \(self.state)") + } + + switch consumerState { + case .running, .suspended, .finished: + fatalError("\(#function) should not be invoked in state \(self.state)") + case .waitingForMessages(let source): + self.state = .running(client: client, messagePollLoopState: .running(source: source)) + } + } + + /// The consumer has read to the end of a partition and shall now go into a sleep loop until new messages are produced. + mutating func waitForNewMessages() { + guard case .running(let client, let consumerState) = self.state else { + fatalError("\(#function) invoked while still in state \(self.state)") + } + + switch consumerState { + case .running(let source): + self.state = .running(client: client, messagePollLoopState: .waitingForMessages(source: source)) + case .suspended, .waitingForMessages, .finished: + fatalError("\(#function) should not be invoked in state \(self.state)") + } + } + + /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to produce more messages. + mutating func produceMore() { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .initializing: + break // This case can be triggered by the KafkaConsumerMessagesDeletgate + case .running(let client, let consumerState): + switch consumerState { + case .running, .waitingForMessages, .finished: + break + case .suspended(let source): + self.state = .running(client: client, messagePollLoopState: .running(source: source)) + } + case .finishing, .finished: + break + } + } + + /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to temporarily stop producing messages. + mutating func stopProducing() { + guard case .running(let client, let consumerState) = self.state else { + fatalError("\(#function) invoked while still in state \(self.state)") + } + + switch consumerState { + case .suspended, .finished: + break + case .running(let source): + self.state = .running(client: client, messagePollLoopState: .suspended(source: source)) + case .waitingForMessages(let source): + self.state = .running(client: client, messagePollLoopState: .suspended(source: source)) + } + } + + /// The ``KafkaConsumerMessages`` asynchronous sequence was terminated. + mutating func finishMessageConsumption() { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .initializing: + fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") + case .running(let client, _): + self.state = .running(client: client, messagePollLoopState: .finished) + case .finishing, .finished: + break + } + } } } diff --git a/Sources/Kafka/KafkaConsumerEvent.swift b/Sources/Kafka/KafkaConsumerEvent.swift index ac565f39..f654797b 100644 --- a/Sources/Kafka/KafkaConsumerEvent.swift +++ b/Sources/Kafka/KafkaConsumerEvent.swift @@ -21,8 +21,6 @@ public enum KafkaConsumerEvent: Sendable, Hashable { switch event { case .deliveryReport: fatalError("Cannot cast \(event) to KafkaConsumerEvent") - case .consumerMessages: - fatalError("Consumer messages should be handled in the KafkaConsumerMessages asynchronous sequence") } } } diff --git a/Sources/Kafka/KafkaProducer.swift b/Sources/Kafka/KafkaProducer.swift index 6e9b29db..2c6dc4fc 100644 --- a/Sources/Kafka/KafkaProducer.swift +++ b/Sources/Kafka/KafkaProducer.swift @@ -294,7 +294,7 @@ extension KafkaProducer { /// All incoming events will be dropped. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case consumptionStopped(client: RDKafkaClient) + case eventConsumptionFinished(client: RDKafkaClient) /// ``KafkaProducer/triggerGracefulShutdown()`` was invoked so we are flushing /// any messages that wait to be sent and serve any remaining queued callbacks. /// @@ -359,7 +359,7 @@ extension KafkaProducer { fatalError("\(#function) invoked while still in state \(self.state)") case .started(let client, _, let source, _): return .pollAndYield(client: client, source: source) - case .consumptionStopped(let client): + case .eventConsumptionFinished(let client): return .pollWithoutYield(client: client) case .finishing(let client, let source): return .flushFinishSourceAndTerminatePollLoop(client: client, source: source) @@ -400,7 +400,7 @@ extension KafkaProducer { newMessageID: newMessageID, topicHandles: topicHandles ) - case .consumptionStopped: + case .eventConsumptionFinished: throw KafkaError.connectionClosed(reason: "Sequence consuming events was abruptly terminated, producer closed") case .finishing: throw KafkaError.connectionClosed(reason: "Producer in the process of finishing") @@ -423,10 +423,10 @@ extension KafkaProducer { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") - case .consumptionStopped: + case .eventConsumptionFinished: fatalError("messageSequenceTerminated() must not be invoked more than once") case .started(let client, _, let source, _): - self.state = .consumptionStopped(client: client) + self.state = .eventConsumptionFinished(client: client) return .finishSource(source: source) case .finishing(let client, let source): // Setting source to nil to prevent incoming events from buffering in `source` @@ -447,7 +447,7 @@ extension KafkaProducer { fatalError("\(#function) invoked while still in state \(self.state)") case .started(let client, _, let source, _): self.state = .finishing(client: client, source: source) - case .consumptionStopped(let client): + case .eventConsumptionFinished(let client): self.state = .finishing(client: client, source: nil) case .finishing, .finished: break diff --git a/Sources/Kafka/KafkaProducerEvent.swift b/Sources/Kafka/KafkaProducerEvent.swift index 6619ed2a..9684f146 100644 --- a/Sources/Kafka/KafkaProducerEvent.swift +++ b/Sources/Kafka/KafkaProducerEvent.swift @@ -23,8 +23,6 @@ public enum KafkaProducerEvent: Sendable, Hashable { switch event { case .deliveryReport(results: let results): self = .deliveryReports(results) - case .consumerMessages: - fatalError("Cannot cast \(event) to KafkaProducerEvent") } } } diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 47850e7b..623f2c34 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -44,22 +44,7 @@ final class RDKafkaClient: Sendable { ) { self.kafkaHandle = kafkaHandle self.logger = logger - - if type == .consumer { - if let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle) { - // (Important) - // Polling the queue counts as a consumer poll, and will reset the timer for `max.poll.interval.ms`. - self.queue = consumerQueue - } else { - fatalError(""" - Internal error: failed to get consumer queue. \ - A group.id should be set even when the client is not part of a consumer group. \ - See https://github.com/edenhill/librdkafka/issues/3261 for more information. - """) - } - } else { - self.queue = rd_kafka_queue_get_main(self.kafkaHandle) - } + self.queue = rd_kafka_queue_get_main(self.kafkaHandle) rd_kafka_set_log_queue(self.kafkaHandle, self.queue) } @@ -310,7 +295,6 @@ final class RDKafkaClient: Sendable { /// Swift wrapper for events from `librdkafka`'s event queue. enum KafkaEvent { case deliveryReport(results: [KafkaDeliveryReport]) - case consumerMessages(result: Result) } /// Poll the event `rd_kafka_queue_t` for new events. @@ -333,10 +317,6 @@ final class RDKafkaClient: Sendable { case .deliveryReport: let forwardEvent = self.handleDeliveryReportEvent(event) events.append(forwardEvent) - case .fetch: - if let forwardEvent = self.handleFetchEvent(event) { - events.append(forwardEvent) - } case .log: self.handleLogEvent(event) case .offsetCommit: @@ -372,26 +352,6 @@ final class RDKafkaClient: Sendable { return .deliveryReport(results: deliveryReportResults) } - /// Handle event of type `RDKafkaEvent.fetch`. - /// - /// - Parameter event: Pointer to underlying `rd_kafka_event_t`. - /// - Returns: `KafkaEvent` to be returned as part of ``KafkaClient.eventPoll()`. - private func handleFetchEvent(_ event: OpaquePointer?) -> KafkaEvent? { - do { - // RD_KAFKA_EVENT_FETCH only returns a single message: - // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a3a855eb7bdf17f5797d4911362a5fc7c - if let messagePointer = rd_kafka_event_message_next(event) { - let message = try KafkaConsumerMessage(messagePointer: messagePointer) - return .consumerMessages(result: .success(message)) - } else { - return nil - } - } catch { - return .consumerMessages(result: .failure(error)) - } - // The returned message(s) MUST NOT be freed with rd_kafka_message_destroy(). - } - /// Handle event of type `RDKafkaEvent.log`. /// /// - Parameter event: Pointer to underlying `rd_kafka_event_t`. @@ -451,18 +411,30 @@ final class RDKafkaClient: Sendable { actualCallback(.success(())) } - /// Redirect the main ``RDKafkaClient/poll(timeout:)`` queue to the `KafkaConsumer`'s - /// queue (``RDKafkaClient/consumerPoll``). + /// Request a new message from the Kafka cluster. /// - /// Events that would be triggered by ``RDKafkaClient/poll(timeout:)`` - /// are now triggered by ``RDKafkaClient/consumerPoll``. + /// - Important: This method should only be invoked from ``KafkaConsumer``. /// - /// - Warning: It is not allowed to call ``RDKafkaClient/poll(timeout:)`` after ``RDKafkaClient/pollSetConsumer``. - func pollSetConsumer() throws { - let result = rd_kafka_poll_set_consumer(self.kafkaHandle) - if result != RD_KAFKA_RESP_ERR_NO_ERROR { - throw KafkaError.rdKafkaError(wrapping: result) + /// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages. + /// - Throws: A ``KafkaError`` if the received message is an error message or malformed. + func consumerPoll() throws -> KafkaConsumerMessage? { + guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle, 0) else { + // No error, there might be no more messages + return nil } + + defer { + // Destroy message otherwise poll() will block forever + rd_kafka_message_destroy(messagePointer) + } + + // Reached the end of the topic+partition queue on the broker + if messagePointer.pointee.err == RD_KAFKA_RESP_ERR__PARTITION_EOF { + return nil + } + + let message = try KafkaConsumerMessage(messagePointer: messagePointer) + return message } /// Subscribe to topic set using balanced consumer groups.