From ff7555c4c6c119de5b5579da2c86d50fd6f3a883 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Tue, 10 Oct 2023 23:14:46 +0100 Subject: [PATCH 1/5] Add Back Pressure to `KafkaConsumer` Motivation: Closes #131. Modifications: * re-add `KafkaConsumerConfiguration.backPressureStrategy: BackPressureStrategy`, currently allowing users to add high-low-watermark backpressure to their `KafkaConsumer`s * `KafkaConsumer`: * make `KafkaConsumerMessages` use `NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark` as backpressure strategy * remove `rd_kafka_poll_set_consumer` -> use two separate queues for consumer events and consumer messages so we can exert backpressure on the consumer message queue * remove idle polling mechanism where incoming messages were discarded when `KafkaConsumerMessages` was terminated -> we now have to independent queues * rename `.pollForAndYieldMessage` -> `.pollForEventsAndMessages` * refactor `State` and add `ConsumerMessagesSequenceState` * `KafkaProducer`: * rename `.consumptionStopped` -> `.eventConsumptionFinished` * `RDKafkaClient`: * bring back `consumerPoll()` * `eventPoll()`: only queue main queue for events since consumer messages are now handled on a different queue --- .../KafkaConsumerConfiguration.swift | 33 ++- Sources/Kafka/KafkaConsumer.swift | 230 ++++++++++++------ Sources/Kafka/KafkaConsumerEvent.swift | 2 - Sources/Kafka/KafkaProducer.swift | 12 +- Sources/Kafka/KafkaProducerEvent.swift | 2 - Sources/Kafka/RDKafka/RDKafkaClient.swift | 72 ++---- 6 files changed, 219 insertions(+), 132 deletions(-) diff --git a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift index a8d7d26b..60215250 100644 --- a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift @@ -23,6 +23,37 @@ public struct KafkaConsumerConfiguration { /// Default: `.milliseconds(100)` public var pollInterval: Duration = .milliseconds(100) + /// A struct representing different back pressure strategies for consuming messages in ``KafkaConsumer``. + public struct BackPressureStrategy: Sendable, Hashable { + enum _BackPressureStrategy: Sendable, Hashable { + case watermark(low: Int, high: Int) + } + + let _internal: _BackPressureStrategy + + private init(backPressureStrategy: _BackPressureStrategy) { + self._internal = backPressureStrategy + } + + /// A back pressure strategy based on high and low watermarks. + /// + /// The consumer maintains a buffer size between a low watermark and a high watermark + /// to control the flow of incoming messages. + /// + /// - Parameter low: The lower threshold for the buffer size (low watermark). + /// - Parameter high: The upper threshold for the buffer size (high watermark). + public static func watermark(low: Int, high: Int) -> BackPressureStrategy { + return .init(backPressureStrategy: .watermark(low: low, high: high)) + } + } + + /// The backpressure strategy to be used for message consumption. + /// See ``KafkaConsumerConfiguration/BackPressureStrategy-swift.struct`` for more information. + public var backPressureStrategy: BackPressureStrategy = .watermark( + low: 10, + high: 50 + ) + /// A struct representing the different Kafka message consumption strategies. public struct ConsumptionStrategy: Sendable, Hashable { enum _ConsumptionStrategy: Sendable, Hashable { @@ -63,7 +94,7 @@ public struct KafkaConsumerConfiguration { } /// The strategy used for consuming messages. - /// See ``KafkaConfiguration/ConsumptionStrategy`` for more information. + /// See ``KafkaConsumerConfiguration/ConsumptionStrategy-swift.struct-swift.struct`` for more information. public var consumptionStrategy: ConsumptionStrategy // MARK: - Consumer-specific Config Properties diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index a2dfa239..70226040 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -17,19 +17,35 @@ import NIOConcurrencyHelpers import NIOCore import ServiceLifecycle -// MARK: - KafkaConsumerCloseOnTerminate +// MARK: - KafkaConsumerEventsDelegate -/// `NIOAsyncSequenceProducerDelegate` that terminates the closes the producer when -/// `didTerminate()` is invoked. -internal struct KafkaConsumerCloseOnTerminate: Sendable { +/// `NIOAsyncSequenceProducerDelegate` for ``KafkaConsumerEvents``. +internal struct KafkaConsumerEventsDelegate: Sendable { let stateMachine: NIOLockedValueBox } -extension KafkaConsumerCloseOnTerminate: NIOAsyncSequenceProducerDelegate { +extension KafkaConsumerEventsDelegate: NIOAsyncSequenceProducerDelegate { func produceMore() { return // No back pressure } + func didTerminate() { + return // We have to call poll for events anyway, nothing to do here + } +} + +// MARK: - KafkaConsumerMessagesDelegate + +/// `NIOAsyncSequenceProducerDelegate` for ``KafkaConsumerMessages``. +internal struct KafkaConsumerMessagesDelegate: Sendable { + let stateMachine: NIOLockedValueBox +} + +extension KafkaConsumerMessagesDelegate: NIOAsyncSequenceProducerDelegate { + func produceMore() { + self.stateMachine.withLockedValue { $0.produceMore() } + } + func didTerminate() { self.stateMachine.withLockedValue { $0.messageSequenceTerminated() } } @@ -41,7 +57,7 @@ extension KafkaConsumerCloseOnTerminate: NIOAsyncSequenceProducerDelegate { public struct KafkaConsumerEvents: Sendable, AsyncSequence { public typealias Element = KafkaConsumerEvent typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure - typealias WrappedSequence = NIOAsyncSequenceProducer + typealias WrappedSequence = NIOAsyncSequenceProducer let wrappedSequence: WrappedSequence /// `AsynceIteratorProtocol` implementation for handling ``KafkaConsumerEvent``s emitted by Kafka. @@ -65,12 +81,12 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { let stateMachine: NIOLockedValueBox public typealias Element = KafkaConsumerMessage - typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure + typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark typealias WrappedSequence = NIOThrowingAsyncSequenceProducer< Result, Error, BackPressureStrategy, - KafkaConsumerCloseOnTerminate + KafkaConsumerMessagesDelegate > let wrappedSequence: WrappedSequence @@ -127,8 +143,8 @@ public final class KafkaConsumer: Sendable, Service { typealias Producer = NIOThrowingAsyncSequenceProducer< Result, Error, - NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure, - KafkaConsumerCloseOnTerminate + NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, + KafkaConsumerMessagesDelegate > /// The configuration object of the consumer client. private let configuration: KafkaConsumerConfiguration @@ -163,8 +179,16 @@ public final class KafkaConsumer: Sendable, Service { let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence( elementType: Result.self, - backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(), - delegate: KafkaConsumerCloseOnTerminate(stateMachine: self.stateMachine) + backPressureStrategy: { + switch configuration.backPressureStrategy._internal { + case .watermark(let lowWatermark, let highWatermark): + return NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark( + lowWatermark: lowWatermark, + highWatermark: highWatermark + ) + } + }(), + delegate: KafkaConsumerMessagesDelegate(stateMachine: self.stateMachine) ) self.messages = KafkaConsumerMessages( @@ -178,9 +202,6 @@ public final class KafkaConsumer: Sendable, Service { source: sourceAndSequence.source ) } - - // Forward main queue events to the consumer queue. - try client.pollSetConsumer() } /// Initialize a new ``KafkaConsumer``. @@ -198,7 +219,7 @@ public final class KafkaConsumer: Sendable, Service { ) throws { let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) - var subscribedEvents: [RDKafkaEvent] = [.log, .fetch] + var subscribedEvents: [RDKafkaEvent] = [.log] // Only listen to offset commit events when autoCommit is false if configuration.isAutoCommitEnabled == false { subscribedEvents.append(.offsetCommit) @@ -238,7 +259,7 @@ public final class KafkaConsumer: Sendable, Service { ) throws -> (KafkaConsumer, KafkaConsumerEvents) { let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) - var subscribedEvents: [RDKafkaEvent] = [.log, .fetch] + var subscribedEvents: [RDKafkaEvent] = [.log] // Only listen to offset commit events when autoCommit is false if configuration.isAutoCommitEnabled == false { subscribedEvents.append(.offsetCommit) @@ -266,7 +287,7 @@ public final class KafkaConsumer: Sendable, Service { let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( elementType: KafkaConsumerEvent.self, backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(), - delegate: KafkaConsumerCloseOnTerminate(stateMachine: stateMachine) + delegate: KafkaConsumerEventsDelegate(stateMachine: stateMachine) ) let eventsSequence = KafkaConsumerEvents(wrappedSequence: sourceAndSequence.sequence) @@ -334,22 +355,32 @@ public final class KafkaConsumer: Sendable, Service { while !Task.isCancelled { let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() } switch nextAction { - case .pollForAndYieldMessage(let client, let source): - let events = client.eventPoll() - for event in events { - switch event { - case .consumerMessages(let result): - // We do not support back pressure, we can ignore the yield result - _ = source.yield(result) - default: - break // Ignore + case .pollForEventsAndMessage(let client, let source): + // Event poll to serve any events queued inside of `librdkafka`. + _ = client.eventPoll() + // Poll for new consumer message. + var result: Result? + do { + if let message = try client.consumerPoll() { + result = .success(message) + } + } catch { + result = .failure(error) + } + if let result { + let yieldResult = source.yield(result) + switch yieldResult { + case .produceMore: + break + case .stopProducing: + self.stateMachine.withLockedValue { $0.stopProducing() } + case .dropped: + break } } try await Task.sleep(for: self.configuration.pollInterval) - case .pollWithoutYield(let client): - // Ignore poll result. - // We are just polling to serve any remaining events queued inside of `librdkafka`. - // All remaining queued consumer messages will get dropped and not be committed (marked as read). + case .pollForEvents(let client): + // Event poll to serve any events queued inside of `librdkafka`. _ = client.eventPoll() try await Task.sleep(for: self.configuration.pollInterval) case .terminatePollLoop: @@ -459,6 +490,20 @@ extension KafkaConsumer { /// A logger. let logger: Logger + /// Represents the state of the ``KafkaConsumerMessages`` asynchronous sequence. + enum ConsumerMessagesSequenceState { + /// The sequence can take more messages. + /// + /// - Parameter source: The source for yielding new messages. + case open(source: Producer.Source) + /// Sequence suspended due to back pressure. + /// + /// - Parameter source: The source for yielding new messages. + case suspended(source: Producer.Source) + /// The sequence has finished, and no more messages will be produced. + case finished + } + /// The state of the ``StateMachine``. enum State: Sendable { /// The state machine has been initialized with init() but is not yet Initialized @@ -468,24 +513,19 @@ extension KafkaConsumer { /// though ``subscribe()`` / ``assign()`` have not been invoked. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + /// - Parameter messagesSequenceState: State of ``KafkaConsumerMessages`` `AsyncSequence` we use to give ``KafkaConsumerMessage``s back to the user. case initializing( client: RDKafkaClient, - source: Producer.Source + messagesSequenceState: ConsumerMessagesSequenceState ) /// The ``KafkaConsumer`` is consuming messages. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + /// - Parameter messagesSequenceState: State of ``KafkaConsumerMessages`` `AsyncSequence` we use to give ``KafkaConsumerMessage``s back to the user. case consuming( client: RDKafkaClient, - source: Producer.Source + messagesSequenceState: ConsumerMessagesSequenceState ) - /// Consumer is still running but the messages asynchronous sequence was terminated. - /// All incoming messages will be dropped. - /// - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case consumptionStopped(client: RDKafkaClient) /// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked. /// We are now in the process of commiting our last state to the broker. /// @@ -509,7 +549,7 @@ extension KafkaConsumer { } self.state = .initializing( client: client, - source: source + messagesSequenceState: .open(source: source) ) } @@ -519,16 +559,16 @@ extension KafkaConsumer { /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case pollForAndYieldMessage( + case pollForEventsAndMessage( client: RDKafkaClient, source: Producer.Source ) - /// The ``KafkaConsumer`` stopped consuming messages or - /// is in the process of shutting down. - /// Poll to serve any queued events and commit outstanding state to the broker. + /// Serve any queued callbacks on the event queue. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case pollWithoutYield(client: RDKafkaClient) + case pollForEvents( + client: RDKafkaClient + ) /// Terminate the poll loop. case terminatePollLoop } @@ -543,16 +583,19 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .consuming(let client, let source): - return .pollForAndYieldMessage(client: client, source: source) - case .consumptionStopped(let client): - return .pollWithoutYield(client: client) + case .consuming(let client, let messagesSequenceState): + switch messagesSequenceState { + case .open(let source): + return .pollForEventsAndMessage(client: client, source: source) + case .suspended, .finished: + return .pollForEvents(client: client) + } case .finishing(let client): if client.isConsumerClosed { self.state = .finished return .terminatePollLoop } else { - return .pollWithoutYield(client: client) + return .pollForEvents(client: client) } case .finished: return .terminatePollLoop @@ -573,29 +616,70 @@ extension KafkaConsumer { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing(let client, let source): + case .initializing(let client, let messagesSequenceState): self.state = .consuming( client: client, - source: source + messagesSequenceState: messagesSequenceState ) return .setUpConnection(client: client) - case .consuming, .consumptionStopped, .finishing, .finished: + case .consuming, .finishing, .finished: fatalError("\(#function) should only be invoked upon initialization of KafkaConsumer") } } /// The messages asynchronous sequence was terminated. - /// All incoming messages will be dropped. + /// Stop polling for new messages. mutating func messageSequenceTerminated() { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Call to \(#function) before setUpConnection() was invoked") - case .consumptionStopped: - fatalError("messageSequenceTerminated() must not be invoked more than once") - case .consuming(let client, _): - self.state = .consumptionStopped(client: client) + case .consuming(let client, let messagesSequenceState): + switch messagesSequenceState { + case .suspended, .open: + self.state = .consuming(client: client, messagesSequenceState: .finished) + case .finished: + fatalError("messageSequenceTerminated() must not be invoked more than once") + } + case .finishing, .finished: + break + } + } + + /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to produce more messages. + mutating func produceMore() { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .initializing: + break // This case can be triggered by `NIOAsyncSequenceProducerDelegate`, ignore + case .consuming(let client, let messagesSequenceState): + switch messagesSequenceState { + case .open, .finished: + break + case .suspended(let source): + self.state = .consuming(client: client, messagesSequenceState: .open(source: source)) + } + case .finishing, .finished: + break + } + } + + /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to temporarily stop producing messages. + mutating func stopProducing() { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .initializing: + fatalError("Call to \(#function) before setUpConnection() was invoked") + case .consuming(let client, let messagesSequenceState): + switch messagesSequenceState { + case .suspended, .finished: + break + case .open(let source): + self.state = .consuming(client: client, messagesSequenceState: .suspended(source: source)) + } case .finishing, .finished: break } @@ -618,8 +702,6 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets") - case .consumptionStopped: - fatalError("Cannot store offset when consumption has been stopped") case .consuming(let client, _): return .storeOffset(client: client) case .finishing, .finished: @@ -649,8 +731,6 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets") - case .consumptionStopped: - fatalError("Cannot commit when consumption has been stopped") case .consuming(let client, _): return .commit(client: client) case .finishing, .finished: @@ -686,15 +766,23 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("subscribe() / assign() should have been invoked before \(#function)") - case .consuming(let client, let source): + case .consuming(let client, let messagesSequenceState): self.state = .finishing(client: client) - return .triggerGracefulShutdownAndFinishSource( - client: client, - source: source - ) - case .consumptionStopped(let client): - self.state = .finishing(client: client) - return .triggerGracefulShutdown(client: client) + + switch messagesSequenceState { + case .open(let source): + return .triggerGracefulShutdownAndFinishSource( + client: client, + source: source + ) + case .suspended(let source): + return .triggerGracefulShutdownAndFinishSource( + client: client, + source: source + ) + case .finished: + return .triggerGracefulShutdown(client: client) + } case .finishing, .finished: return nil } diff --git a/Sources/Kafka/KafkaConsumerEvent.swift b/Sources/Kafka/KafkaConsumerEvent.swift index ac565f39..f654797b 100644 --- a/Sources/Kafka/KafkaConsumerEvent.swift +++ b/Sources/Kafka/KafkaConsumerEvent.swift @@ -21,8 +21,6 @@ public enum KafkaConsumerEvent: Sendable, Hashable { switch event { case .deliveryReport: fatalError("Cannot cast \(event) to KafkaConsumerEvent") - case .consumerMessages: - fatalError("Consumer messages should be handled in the KafkaConsumerMessages asynchronous sequence") } } } diff --git a/Sources/Kafka/KafkaProducer.swift b/Sources/Kafka/KafkaProducer.swift index 6e9b29db..2c6dc4fc 100644 --- a/Sources/Kafka/KafkaProducer.swift +++ b/Sources/Kafka/KafkaProducer.swift @@ -294,7 +294,7 @@ extension KafkaProducer { /// All incoming events will be dropped. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case consumptionStopped(client: RDKafkaClient) + case eventConsumptionFinished(client: RDKafkaClient) /// ``KafkaProducer/triggerGracefulShutdown()`` was invoked so we are flushing /// any messages that wait to be sent and serve any remaining queued callbacks. /// @@ -359,7 +359,7 @@ extension KafkaProducer { fatalError("\(#function) invoked while still in state \(self.state)") case .started(let client, _, let source, _): return .pollAndYield(client: client, source: source) - case .consumptionStopped(let client): + case .eventConsumptionFinished(let client): return .pollWithoutYield(client: client) case .finishing(let client, let source): return .flushFinishSourceAndTerminatePollLoop(client: client, source: source) @@ -400,7 +400,7 @@ extension KafkaProducer { newMessageID: newMessageID, topicHandles: topicHandles ) - case .consumptionStopped: + case .eventConsumptionFinished: throw KafkaError.connectionClosed(reason: "Sequence consuming events was abruptly terminated, producer closed") case .finishing: throw KafkaError.connectionClosed(reason: "Producer in the process of finishing") @@ -423,10 +423,10 @@ extension KafkaProducer { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") - case .consumptionStopped: + case .eventConsumptionFinished: fatalError("messageSequenceTerminated() must not be invoked more than once") case .started(let client, _, let source, _): - self.state = .consumptionStopped(client: client) + self.state = .eventConsumptionFinished(client: client) return .finishSource(source: source) case .finishing(let client, let source): // Setting source to nil to prevent incoming events from buffering in `source` @@ -447,7 +447,7 @@ extension KafkaProducer { fatalError("\(#function) invoked while still in state \(self.state)") case .started(let client, _, let source, _): self.state = .finishing(client: client, source: source) - case .consumptionStopped(let client): + case .eventConsumptionFinished(let client): self.state = .finishing(client: client, source: nil) case .finishing, .finished: break diff --git a/Sources/Kafka/KafkaProducerEvent.swift b/Sources/Kafka/KafkaProducerEvent.swift index 6619ed2a..9684f146 100644 --- a/Sources/Kafka/KafkaProducerEvent.swift +++ b/Sources/Kafka/KafkaProducerEvent.swift @@ -23,8 +23,6 @@ public enum KafkaProducerEvent: Sendable, Hashable { switch event { case .deliveryReport(results: let results): self = .deliveryReports(results) - case .consumerMessages: - fatalError("Cannot cast \(event) to KafkaProducerEvent") } } } diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index da415f22..8238cb91 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -44,22 +44,7 @@ final class RDKafkaClient: Sendable { ) { self.kafkaHandle = kafkaHandle self.logger = logger - - if type == .consumer { - if let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle) { - // (Important) - // Polling the queue counts as a consumer poll, and will reset the timer for `max.poll.interval.ms`. - self.queue = consumerQueue - } else { - fatalError(""" - Internal error: failed to get consumer queue. \ - A group.id should be set even when the client is not part of a consumer group. \ - See https://github.com/edenhill/librdkafka/issues/3261 for more information. - """) - } - } else { - self.queue = rd_kafka_queue_get_main(self.kafkaHandle) - } + self.queue = rd_kafka_queue_get_main(self.kafkaHandle) rd_kafka_set_log_queue(self.kafkaHandle, self.queue) } @@ -310,7 +295,6 @@ final class RDKafkaClient: Sendable { /// Swift wrapper for events from `librdkafka`'s event queue. enum KafkaEvent { case deliveryReport(results: [KafkaDeliveryReport]) - case consumerMessages(result: Result) } /// Poll the event `rd_kafka_queue_t` for new events. @@ -333,10 +317,6 @@ final class RDKafkaClient: Sendable { case .deliveryReport: let forwardEvent = self.handleDeliveryReportEvent(event) events.append(forwardEvent) - case .fetch: - if let forwardEvent = self.handleFetchEvent(event) { - events.append(forwardEvent) - } case .log: self.handleLogEvent(event) case .offsetCommit: @@ -372,26 +352,6 @@ final class RDKafkaClient: Sendable { return .deliveryReport(results: deliveryReportResults) } - /// Handle event of type `RDKafkaEvent.fetch`. - /// - /// - Parameter event: Pointer to underlying `rd_kafka_event_t`. - /// - Returns: `KafkaEvent` to be returned as part of ``KafkaClient.eventPoll()`. - private func handleFetchEvent(_ event: OpaquePointer?) -> KafkaEvent? { - do { - // RD_KAFKA_EVENT_FETCH only returns a single message: - // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a3a855eb7bdf17f5797d4911362a5fc7c - if let messagePointer = rd_kafka_event_message_next(event) { - let message = try KafkaConsumerMessage(messagePointer: messagePointer) - return .consumerMessages(result: .success(message)) - } else { - return nil - } - } catch { - return .consumerMessages(result: .failure(error)) - } - // The returned message(s) MUST NOT be freed with rd_kafka_message_destroy(). - } - /// Handle event of type `RDKafkaEvent.log`. /// /// - Parameter event: Pointer to underlying `rd_kafka_event_t`. @@ -451,18 +411,30 @@ final class RDKafkaClient: Sendable { actualCallback(.success(())) } - /// Redirect the main ``RDKafkaClient/poll(timeout:)`` queue to the `KafkaConsumer`'s - /// queue (``RDKafkaClient/consumerPoll``). + /// Request a new message from the Kafka cluster. /// - /// Events that would be triggered by ``RDKafkaClient/poll(timeout:)`` - /// are now triggered by ``RDKafkaClient/consumerPoll``. + /// - Important: This method should only be invoked from ``KafkaConsumer``. /// - /// - Warning: It is not allowed to call ``RDKafkaClient/poll(timeout:)`` after ``RDKafkaClient/pollSetConsumer``. - func pollSetConsumer() throws { - let result = rd_kafka_poll_set_consumer(self.kafkaHandle) - if result != RD_KAFKA_RESP_ERR_NO_ERROR { - throw KafkaError.rdKafkaError(wrapping: result) + /// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages. + /// - Throws: A ``KafkaError`` if the received message is an error message or malformed. + func consumerPoll() throws -> KafkaConsumerMessage? { + guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle, 0) else { + // No error, there might be no more messages + return nil } + + defer { + // Destroy message otherwise poll() will block forever + rd_kafka_message_destroy(messagePointer) + } + + // Reached the end of the topic+partition queue on the broker + if messagePointer.pointee.err == RD_KAFKA_RESP_ERR__PARTITION_EOF { + return nil + } + + let message = try KafkaConsumerMessage(messagePointer: messagePointer) + return message } /// Subscribe to topic set using balanced consumer groups. From 8d23aa6a546940d617190ca77709dd2f3e449268 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Fri, 13 Oct 2023 13:18:03 +0100 Subject: [PATCH 2/5] KafkaConsumer: two state machines Modifications: * have two state machines: 1. consumer state itself 2. state of consumer messages async sequence --- Sources/Kafka/KafkaConsumer.swift | 459 ++++++++++++++---------- Tests/IntegrationTests/KafkaTests.swift | 50 +++ 2 files changed, 323 insertions(+), 186 deletions(-) diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index 70226040..b28533e5 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -38,7 +38,7 @@ extension KafkaConsumerEventsDelegate: NIOAsyncSequenceProducerDelegate { /// `NIOAsyncSequenceProducerDelegate` for ``KafkaConsumerMessages``. internal struct KafkaConsumerMessagesDelegate: Sendable { - let stateMachine: NIOLockedValueBox + let stateMachine: NIOLockedValueBox } extension KafkaConsumerMessagesDelegate: NIOAsyncSequenceProducerDelegate { @@ -47,7 +47,7 @@ extension KafkaConsumerMessagesDelegate: NIOAsyncSequenceProducerDelegate { } func didTerminate() { - self.stateMachine.withLockedValue { $0.messageSequenceTerminated() } + self.stateMachine.withLockedValue { $0.finish() } } } @@ -152,29 +152,34 @@ public final class KafkaConsumer: Sendable, Service { private let logger: Logger /// State of the `KafkaConsumer`. private let stateMachine: NIOLockedValueBox + // TODO: refactor + private let consumerMessagesStateMachine: NIOLockedValueBox /// An asynchronous sequence containing messages from the Kafka cluster. public let messages: KafkaConsumerMessages // Private initializer, use factory method or convenience init to create KafkaConsumer /// Initialize a new ``KafkaConsumer``. - /// To listen to incoming messages, please subscribe to a list of topics using ``subscribe(topics:)`` + /// To listen to incoming messages, please subscribe to a list of topics using ``subscribe()`` /// or assign the consumer to a particular topic + partition pair using ``assign(topic:partition:offset:)``. /// /// - Parameters: /// - client: Client used for handling the connection to the Kafka cluster. /// - stateMachine: The state machine containing the state of the ``KafkaConsumer``. + /// TODO /// - configuration: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``. /// - logger: A logger. /// - Throws: A ``KafkaError`` if the initialization failed. private init( client: RDKafkaClient, stateMachine: NIOLockedValueBox, + consumerMessagesStateMachine: NIOLockedValueBox, configuration: KafkaConsumerConfiguration, logger: Logger ) throws { self.configuration = configuration self.stateMachine = stateMachine + self.consumerMessagesStateMachine = consumerMessagesStateMachine self.logger = logger let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence( @@ -188,7 +193,7 @@ public final class KafkaConsumer: Sendable, Service { ) } }(), - delegate: KafkaConsumerMessagesDelegate(stateMachine: self.stateMachine) + delegate: KafkaConsumerMessagesDelegate(stateMachine: self.consumerMessagesStateMachine) ) self.messages = KafkaConsumerMessages( @@ -196,7 +201,7 @@ public final class KafkaConsumer: Sendable, Service { wrappedSequence: sourceAndSequence.sequence ) - self.stateMachine.withLockedValue { + self.consumerMessagesStateMachine.withLockedValue { $0.initialize( client: client, source: sourceAndSequence.source @@ -217,8 +222,6 @@ public final class KafkaConsumer: Sendable, Service { configuration: KafkaConsumerConfiguration, logger: Logger ) throws { - let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) - var subscribedEvents: [RDKafkaEvent] = [.log] // Only listen to offset commit events when autoCommit is false if configuration.isAutoCommitEnabled == false { @@ -232,9 +235,13 @@ public final class KafkaConsumer: Sendable, Service { logger: logger ) + let stateMachine = NIOLockedValueBox(StateMachine(client: client)) + let consumerMessagesStateMachine = NIOLockedValueBox(ConsumerMessagesStateMachine()) + try self.init( client: client, stateMachine: stateMachine, + consumerMessagesStateMachine: consumerMessagesStateMachine, configuration: configuration, logger: logger ) @@ -257,8 +264,6 @@ public final class KafkaConsumer: Sendable, Service { configuration: KafkaConsumerConfiguration, logger: Logger ) throws -> (KafkaConsumer, KafkaConsumerEvents) { - let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) - var subscribedEvents: [RDKafkaEvent] = [.log] // Only listen to offset commit events when autoCommit is false if configuration.isAutoCommitEnabled == false { @@ -272,9 +277,13 @@ public final class KafkaConsumer: Sendable, Service { logger: logger ) + let stateMachine = NIOLockedValueBox(StateMachine(client: client)) + let consumerMessagesStateMachine = NIOLockedValueBox(ConsumerMessagesStateMachine()) + let consumer = try KafkaConsumer( client: client, stateMachine: stateMachine, + consumerMessagesStateMachine: consumerMessagesStateMachine, configuration: configuration, logger: logger ) @@ -311,6 +320,8 @@ public final class KafkaConsumer: Sendable, Service { } try client.subscribe(topicPartitionList: subscription) } + // TODO: refactor + self.consumerMessagesStateMachine.withLockedValue { $0.setUpConnection() } } /// Assign the``KafkaConsumer`` to a specific `partition` of a `topic`. @@ -331,6 +342,8 @@ public final class KafkaConsumer: Sendable, Service { assignment.setOffset(topic: topic, partition: partition, offset: Int64(offset.rawValue)) try client.assign(topicPartitionList: assignment) } + // TODO: refactor + self.consumerMessagesStateMachine.withLockedValue { $0.setUpConnection() } } /// Start the ``KafkaConsumer``. @@ -352,40 +365,86 @@ public final class KafkaConsumer: Sendable, Service { try self.subscribe(topics: topics) } - while !Task.isCancelled { - let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() } - switch nextAction { - case .pollForEventsAndMessage(let client, let source): - // Event poll to serve any events queued inside of `librdkafka`. - _ = client.eventPoll() - // Poll for new consumer message. - var result: Result? - do { - if let message = try client.consumerPoll() { - result = .success(message) + // TODO: refactor, smaller functions + try await withThrowingTaskGroup(of: Void.self) { group in + // Run loop for events + group.addTask { + while !Task.isCancelled { + let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() } + switch nextAction { + case .pollForEvents(let client): + // Event poll to serve any events queued inside of `librdkafka`. + _ = client.eventPoll() + try await Task.sleep(for: self.configuration.pollInterval) + case .terminatePollLoop: + return } - } catch { - result = .failure(error) } - if let result { - let yieldResult = source.yield(result) - switch yieldResult { - case .produceMore: - break - case .stopProducing: - self.stateMachine.withLockedValue { $0.stopProducing() } - case .dropped: - break + } + + // Run loop for consumer messages + group.addTask { + while !Task.isCancelled { + let nextAction = self.consumerMessagesStateMachine.withLockedValue { $0.nextPollLoopAction() } + switch nextAction { + case .pollForAndYieldMessage(let client, let source): + // Poll for new consumer message. + var result: Result? + do { + if let message = try client.consumerPoll() { + result = .success(message) + } + } catch { + result = .failure(error) + } + if let result { + let yieldResult = source.yield(result) + switch yieldResult { + case .produceMore: + break + case .stopProducing: + self.consumerMessagesStateMachine.withLockedValue { $0.stopProducing() } + case .dropped: + break + } + } else { + self.consumerMessagesStateMachine.withLockedValue { $0.waitForNewMessages() } + } + case .pollForMessageAndSleep(let client, let source): + var result: Result? + do { + if let message = try client.consumerPoll() { + result = .success(message) + } + } catch { + result = .failure(error) + } + if let result { + self.consumerMessagesStateMachine.withLockedValue { $0.produceMore() } + let yieldResult = source.yield(result) + switch yieldResult { + case .produceMore: + break + case .stopProducing: + self.consumerMessagesStateMachine.withLockedValue { $0.stopProducing() } + case .dropped: + break + } + + self.consumerMessagesStateMachine.withLockedValue { $0.newMessagesProduced() } + } + try await Task.sleep(for: self.configuration.pollInterval) + case .suspend: + try await Task.sleep(for: self.configuration.pollInterval) + case .terminatePollLoop: + return } } - try await Task.sleep(for: self.configuration.pollInterval) - case .pollForEvents(let client): - // Event poll to serve any events queued inside of `librdkafka`. - _ = client.eventPoll() - try await Task.sleep(for: self.configuration.pollInterval) - case .terminatePollLoop: - return } + + // Throw when one of the two child task throws + try await group.next() + try await group.next() } } @@ -455,12 +514,6 @@ public final class KafkaConsumer: Sendable, Service { client: client, logger: self.logger ) - case .triggerGracefulShutdownAndFinishSource(let client, let source): - source.finish() - self._triggerGracefulShutdown( - client: client, - logger: self.logger - ) case .none: return } @@ -470,6 +523,8 @@ public final class KafkaConsumer: Sendable, Service { client: RDKafkaClient, logger: Logger ) { + // TODO: finsish source + self.consumerMessagesStateMachine.withLockedValue { $0.finish() } do { try client.consumerClose() } catch { @@ -485,27 +540,11 @@ public final class KafkaConsumer: Sendable, Service { // MARK: - KafkaConsumer + StateMachine extension KafkaConsumer { - /// State machine representing the state of the ``KafkaConsumer``. - struct StateMachine: Sendable { - /// A logger. - let logger: Logger - - /// Represents the state of the ``KafkaConsumerMessages`` asynchronous sequence. - enum ConsumerMessagesSequenceState { - /// The sequence can take more messages. - /// - /// - Parameter source: The source for yielding new messages. - case open(source: Producer.Source) - /// Sequence suspended due to back pressure. - /// - /// - Parameter source: The source for yielding new messages. - case suspended(source: Producer.Source) - /// The sequence has finished, and no more messages will be produced. - case finished - } - - /// The state of the ``StateMachine``. - enum State: Sendable { + // TODO: revisit entire docc + // TODO: docc + struct ConsumerMessagesStateMachine: Sendable { + // TODO: revisit docc + enum State { /// The state machine has been initialized with init() but is not yet Initialized /// using `func initialize()` (required). case uninitialized @@ -513,29 +552,36 @@ extension KafkaConsumer { /// though ``subscribe()`` / ``assign()`` have not been invoked. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter messagesSequenceState: State of ``KafkaConsumerMessages`` `AsyncSequence` we use to give ``KafkaConsumerMessage``s back to the user. + // TODO: paramter source case initializing( client: RDKafkaClient, - messagesSequenceState: ConsumerMessagesSequenceState + source: Producer.Source ) - /// The ``KafkaConsumer`` is consuming messages. + /// The sequence can take more messages. /// - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter messagesSequenceState: State of ``KafkaConsumerMessages`` `AsyncSequence` we use to give ``KafkaConsumerMessage``s back to the user. - case consuming( + /// - Parameter client: TODO + /// - Parameter source: The source for yielding new messages. + case running( client: RDKafkaClient, - messagesSequenceState: ConsumerMessagesSequenceState + source: Producer.Source ) - /// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked. - /// We are now in the process of commiting our last state to the broker. + /// Sequence suspended due to back pressure. /// - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case finishing(client: RDKafkaClient) - /// The ``KafkaConsumer`` is closed. + /// - Parameter source: The source for yielding new messages. + case suspended( + client: RDKafkaClient, + source: Producer.Source + ) + // TODO: docc read to end and now waiting for new message + case waitingForMessages( + client: RDKafkaClient, + source: Producer.Source + ) + /// The sequence has finished, and no more messages will be produced. case finished } - /// The current state of the StateMachine. + // TODO: docc var state: State = .uninitialized /// Delayed initialization of `StateMachine` as the `source` and the `pollClosure` are @@ -549,7 +595,7 @@ extension KafkaConsumer { } self.state = .initializing( client: client, - messagesSequenceState: .open(source: source) + source: source ) } @@ -559,16 +605,15 @@ extension KafkaConsumer { /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case pollForEventsAndMessage( + case pollForAndYieldMessage( client: RDKafkaClient, source: Producer.Source ) - /// Serve any queued callbacks on the event queue. - /// - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case pollForEvents( - client: RDKafkaClient + case pollForMessageAndSleep( + client: RDKafkaClient, + source: Producer.Source ) + case suspend // TODO: sleep for poll interval /// Terminate the poll loop. case terminatePollLoop } @@ -583,85 +628,91 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .consuming(let client, let messagesSequenceState): - switch messagesSequenceState { - case .open(let source): - return .pollForEventsAndMessage(client: client, source: source) - case .suspended, .finished: - return .pollForEvents(client: client) - } - case .finishing(let client): - if client.isConsumerClosed { - self.state = .finished - return .terminatePollLoop - } else { - return .pollForEvents(client: client) - } + case .running(let client, let source): + return .pollForAndYieldMessage(client: client, source: source) + case .suspended: + return .suspend + case .waitingForMessages(let client, let source): + return .pollForMessageAndSleep(client: client, source: source) case .finished: return .terminatePollLoop } } - /// Action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``. - enum SetUpConnectionAction { - /// Set up the connection through ``subscribe()`` or ``assign()``. - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case setUpConnection(client: RDKafkaClient) + mutating func setUpConnection() { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .initializing(let client, let source): + self.state = .running(client: client, source: source) + case .running, .suspended, .waitingForMessages, .finished: + fatalError("\(#function) should only be invoked upon initialization of KafkaConsumer") + } } - /// Get action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``. - /// - /// - Returns: The action to be taken. - mutating func setUpConnection() -> SetUpConnectionAction { + // TODO: there are new messages to read + mutating func newMessagesProduced() { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing(let client, let messagesSequenceState): - self.state = .consuming( - client: client, - messagesSequenceState: messagesSequenceState - ) - return .setUpConnection(client: client) - case .consuming, .finishing, .finished: - fatalError("\(#function) should only be invoked upon initialization of KafkaConsumer") + case .initializing: + fatalError("Call to \(#function) before setUpConnection() was invoked") + case .running: + fatalError("\(#function) should not be invoked in state \(self.state)") + case .suspended: + fatalError("\(#function) should not be invoked in state \(self.state)") + case .waitingForMessages(let client, let source): + self.state = .running(client: client, source: source) + case .finished: + fatalError("finish() must not be invoked more than once") } } - /// The messages asynchronous sequence was terminated. - /// Stop polling for new messages. - mutating func messageSequenceTerminated() { + mutating func waitForNewMessages() { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Call to \(#function) before setUpConnection() was invoked") - case .consuming(let client, let messagesSequenceState): - switch messagesSequenceState { - case .suspended, .open: - self.state = .consuming(client: client, messagesSequenceState: .finished) - case .finished: - fatalError("messageSequenceTerminated() must not be invoked more than once") - } - case .finishing, .finished: + case .running(let client, let source): + self.state = .waitingForMessages(client: client, source: source) + case .suspended: + fatalError("\(#function) should not be invoked in state \(self.state)") + case .waitingForMessages: + fatalError("\(#function) should not be invoked in state \(self.state)") + case .finished: + fatalError("finish() must not be invoked more than once") + } + } + + mutating func finish() { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .initializing: + fatalError("Call to \(#function) before setUpConnection() was invoked") + case .running, .suspended, .waitingForMessages: + self.state = .finished + case .finished: break } } /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to produce more messages. mutating func produceMore() { + // TODO: group cases switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: break // This case can be triggered by `NIOAsyncSequenceProducerDelegate`, ignore - case .consuming(let client, let messagesSequenceState): - switch messagesSequenceState { - case .open, .finished: - break - case .suspended(let source): - self.state = .consuming(client: client, messagesSequenceState: .open(source: source)) - } - case .finishing, .finished: + case .running: + break + case .suspended(let client, let source): + self.state = .running(client: client, source: source) + case .waitingForMessages: + break + case .finished: break } } @@ -673,15 +724,89 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Call to \(#function) before setUpConnection() was invoked") - case .consuming(let client, let messagesSequenceState): - switch messagesSequenceState { - case .suspended, .finished: - break - case .open(let source): - self.state = .consuming(client: client, messagesSequenceState: .suspended(source: source)) + case .running(let client, let source): + self.state = .suspended(client: client, source: source) + case .suspended: + break + case .waitingForMessages(let client, let source): + self.state = .suspended(client: client, source: source) + case .finished: + break + } + } + } + + /// State machine representing the state of the ``KafkaConsumer``. + struct StateMachine: Sendable { + /// The state of the ``StateMachine``. + enum State: Sendable { + /// The ``KafkaConsumer`` is consuming messages. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + case running(client: RDKafkaClient) + /// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked. + /// We are now in the process of commiting our last state to the broker. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + case finishing(client: RDKafkaClient) + /// The ``KafkaConsumer`` is closed. + case finished + } + + /// The current state of the StateMachine. + var state: State + + // TODO: docc + init(client: RDKafkaClient) { + self.state = .running(client: client) + } + + /// Action to be taken when wanting to poll for a new message. + enum PollLoopAction { + /// Serve any queued callbacks on the event queue. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + case pollForEvents(client: RDKafkaClient) + /// Terminate the poll loop. + case terminatePollLoop + } + + /// Returns the next action to be taken when wanting to poll. + /// - Returns: The next action to be taken when wanting to poll, or `nil` if there is no action to be taken. + /// + /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. + mutating func nextPollLoopAction() -> PollLoopAction { + switch self.state { + case .running(let client): + return .pollForEvents(client: client) + case .finishing(let client): + if client.isConsumerClosed { + self.state = .finished + return .terminatePollLoop + } else { + return .pollForEvents(client: client) } + case .finished: + return .terminatePollLoop + } + } + + /// Action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``. + enum SetUpConnectionAction { + /// Set up the connection through ``subscribe()`` or ``assign()``. + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + case setUpConnection(client: RDKafkaClient) + } + + /// Get action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``. + /// + /// - Returns: The action to be taken. + mutating func setUpConnection() -> SetUpConnectionAction { + switch self.state { + case .running(let client): + return .setUpConnection(client: client) case .finishing, .finished: - break + fatalError("\(#function) should only be invoked when KafkaConsumer is running") } } @@ -698,11 +823,7 @@ extension KafkaConsumer { /// Get action to take when wanting to store a message offset (to be auto-committed by `librdkafka`). func storeOffset() -> StoreOffsetAction { switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing: - fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets") - case .consuming(let client, _): + case .running(let client): return .storeOffset(client: client) case .finishing, .finished: return .terminateConsumerSequence @@ -714,9 +835,7 @@ extension KafkaConsumer { /// Do a commit. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case commit( - client: RDKafkaClient - ) + case commit(client: RDKafkaClient) /// Throw an error. The ``KafkaConsumer`` is closed. case throwClosedError } @@ -727,11 +846,7 @@ extension KafkaConsumer { /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. func commit() -> CommitAction { switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing: - fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets") - case .consuming(let client, _): + case .running(let client): return .commit(client: client) case .finishing, .finished: return .throwClosedError @@ -743,17 +858,7 @@ extension KafkaConsumer { /// Shut down the ``KafkaConsumer``. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case triggerGracefulShutdown( - client: RDKafkaClient - ) - /// Shut down the ``KafkaConsumer`` and finish the given `source` object. - /// - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case triggerGracefulShutdownAndFinishSource( - client: RDKafkaClient, - source: Producer.Source - ) + case triggerGracefulShutdown(client: RDKafkaClient) } /// Get action to be taken when wanting to do close the consumer. @@ -762,27 +867,9 @@ extension KafkaConsumer { /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. mutating func finish() -> FinishAction? { switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing: - fatalError("subscribe() / assign() should have been invoked before \(#function)") - case .consuming(let client, let messagesSequenceState): + case .running(let client): self.state = .finishing(client: client) - - switch messagesSequenceState { - case .open(let source): - return .triggerGracefulShutdownAndFinishSource( - client: client, - source: source - ) - case .suspended(let source): - return .triggerGracefulShutdownAndFinishSource( - client: client, - source: source - ) - case .finished: - return .triggerGracefulShutdown(client: client) - } + return .triggerGracefulShutdown(client: client) case .finishing, .finished: return nil } diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift index e6cf82e5..c8f98212 100644 --- a/Tests/IntegrationTests/KafkaTests.swift +++ b/Tests/IntegrationTests/KafkaTests.swift @@ -599,6 +599,56 @@ final class KafkaTests: XCTestCase { } } + // TODO: remove + func testDelete() async throws { + var consumerConfig = KafkaConsumerConfiguration( + consumptionStrategy: .partition( + KafkaPartition(rawValue: 0), + topic: "test-topic", + offset: KafkaOffset(rawValue: 0) // Important: Read from beginning! + ), + bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] + ) + consumerConfig.pollInterval = .milliseconds(100) + consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning + consumerConfig.broker.addressFamily = .v4 + + let consumer = try KafkaConsumer( + configuration: consumerConfig, + logger: .kafkaTest + ) + + let serviceGroupConfiguration = ServiceGroupConfiguration(services: [consumer], logger: .kafkaTest) + let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await serviceGroup.run() + } + + // Consumer Task + group.addTask { + var count = 0 + for try await message in consumer.messages { + _ = message // drop message + count += 1 +// try await Task.sleep(for: .milliseconds(1)) + if count % 1000 == 0 { + print(count) + } + if count == 1_000_000 { + break + } + } + } + + // Wait for Consumer Task to complete + try await group.next() + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + } + } // MARK: - Helpers private static func createTestMessages( From 848b8c3d84524e0dc2ceb66929dc36b1f65fe5d7 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Sat, 14 Oct 2023 13:54:15 +0100 Subject: [PATCH 3/5] KafkaConsumer: merge both state machines --- Sources/Kafka/KafkaConsumer.swift | 419 +++++++++++++++--------------- 1 file changed, 209 insertions(+), 210 deletions(-) diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index b28533e5..53c0f9a9 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -38,7 +38,7 @@ extension KafkaConsumerEventsDelegate: NIOAsyncSequenceProducerDelegate { /// `NIOAsyncSequenceProducerDelegate` for ``KafkaConsumerMessages``. internal struct KafkaConsumerMessagesDelegate: Sendable { - let stateMachine: NIOLockedValueBox + let stateMachine: NIOLockedValueBox } extension KafkaConsumerMessagesDelegate: NIOAsyncSequenceProducerDelegate { @@ -47,7 +47,7 @@ extension KafkaConsumerMessagesDelegate: NIOAsyncSequenceProducerDelegate { } func didTerminate() { - self.stateMachine.withLockedValue { $0.finish() } + self.stateMachine.withLockedValue { $0.finishMessageConsumption() } } } @@ -152,8 +152,6 @@ public final class KafkaConsumer: Sendable, Service { private let logger: Logger /// State of the `KafkaConsumer`. private let stateMachine: NIOLockedValueBox - // TODO: refactor - private let consumerMessagesStateMachine: NIOLockedValueBox /// An asynchronous sequence containing messages from the Kafka cluster. public let messages: KafkaConsumerMessages @@ -166,20 +164,17 @@ public final class KafkaConsumer: Sendable, Service { /// - Parameters: /// - client: Client used for handling the connection to the Kafka cluster. /// - stateMachine: The state machine containing the state of the ``KafkaConsumer``. - /// TODO /// - configuration: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``. /// - logger: A logger. /// - Throws: A ``KafkaError`` if the initialization failed. private init( client: RDKafkaClient, stateMachine: NIOLockedValueBox, - consumerMessagesStateMachine: NIOLockedValueBox, configuration: KafkaConsumerConfiguration, logger: Logger ) throws { self.configuration = configuration self.stateMachine = stateMachine - self.consumerMessagesStateMachine = consumerMessagesStateMachine self.logger = logger let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence( @@ -193,7 +188,7 @@ public final class KafkaConsumer: Sendable, Service { ) } }(), - delegate: KafkaConsumerMessagesDelegate(stateMachine: self.consumerMessagesStateMachine) + delegate: KafkaConsumerMessagesDelegate(stateMachine: self.stateMachine) ) self.messages = KafkaConsumerMessages( @@ -201,7 +196,7 @@ public final class KafkaConsumer: Sendable, Service { wrappedSequence: sourceAndSequence.sequence ) - self.consumerMessagesStateMachine.withLockedValue { + self.stateMachine.withLockedValue { $0.initialize( client: client, source: sourceAndSequence.source @@ -235,13 +230,11 @@ public final class KafkaConsumer: Sendable, Service { logger: logger ) - let stateMachine = NIOLockedValueBox(StateMachine(client: client)) - let consumerMessagesStateMachine = NIOLockedValueBox(ConsumerMessagesStateMachine()) + let stateMachine = NIOLockedValueBox(StateMachine()) try self.init( client: client, stateMachine: stateMachine, - consumerMessagesStateMachine: consumerMessagesStateMachine, configuration: configuration, logger: logger ) @@ -277,13 +270,11 @@ public final class KafkaConsumer: Sendable, Service { logger: logger ) - let stateMachine = NIOLockedValueBox(StateMachine(client: client)) - let consumerMessagesStateMachine = NIOLockedValueBox(ConsumerMessagesStateMachine()) + let stateMachine = NIOLockedValueBox(StateMachine()) let consumer = try KafkaConsumer( client: client, stateMachine: stateMachine, - consumerMessagesStateMachine: consumerMessagesStateMachine, configuration: configuration, logger: logger ) @@ -320,8 +311,6 @@ public final class KafkaConsumer: Sendable, Service { } try client.subscribe(topicPartitionList: subscription) } - // TODO: refactor - self.consumerMessagesStateMachine.withLockedValue { $0.setUpConnection() } } /// Assign the``KafkaConsumer`` to a specific `partition` of a `topic`. @@ -342,8 +331,6 @@ public final class KafkaConsumer: Sendable, Service { assignment.setOffset(topic: topic, partition: partition, offset: Int64(offset.rawValue)) try client.assign(topicPartitionList: assignment) } - // TODO: refactor - self.consumerMessagesStateMachine.withLockedValue { $0.setUpConnection() } } /// Start the ``KafkaConsumer``. @@ -385,7 +372,7 @@ public final class KafkaConsumer: Sendable, Service { // Run loop for consumer messages group.addTask { while !Task.isCancelled { - let nextAction = self.consumerMessagesStateMachine.withLockedValue { $0.nextPollLoopAction() } + let nextAction = self.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() } switch nextAction { case .pollForAndYieldMessage(let client, let source): // Poll for new consumer message. @@ -403,12 +390,12 @@ public final class KafkaConsumer: Sendable, Service { case .produceMore: break case .stopProducing: - self.consumerMessagesStateMachine.withLockedValue { $0.stopProducing() } + self.stateMachine.withLockedValue { $0.stopProducing() } case .dropped: break } } else { - self.consumerMessagesStateMachine.withLockedValue { $0.waitForNewMessages() } + self.stateMachine.withLockedValue { $0.waitForNewMessages() } } case .pollForMessageAndSleep(let client, let source): var result: Result? @@ -420,18 +407,18 @@ public final class KafkaConsumer: Sendable, Service { result = .failure(error) } if let result { - self.consumerMessagesStateMachine.withLockedValue { $0.produceMore() } + self.stateMachine.withLockedValue { $0.produceMore() } let yieldResult = source.yield(result) switch yieldResult { case .produceMore: break case .stopProducing: - self.consumerMessagesStateMachine.withLockedValue { $0.stopProducing() } + self.stateMachine.withLockedValue { $0.stopProducing() } case .dropped: break } - self.consumerMessagesStateMachine.withLockedValue { $0.newMessagesProduced() } + self.stateMachine.withLockedValue { $0.newMessagesProduced() } } try await Task.sleep(for: self.configuration.pollInterval) case .suspend: @@ -523,8 +510,6 @@ public final class KafkaConsumer: Sendable, Service { client: RDKafkaClient, logger: Logger ) { - // TODO: finsish source - self.consumerMessagesStateMachine.withLockedValue { $0.finish() } do { try client.consumerClose() } catch { @@ -540,11 +525,26 @@ public final class KafkaConsumer: Sendable, Service { // MARK: - KafkaConsumer + StateMachine extension KafkaConsumer { - // TODO: revisit entire docc - // TODO: docc - struct ConsumerMessagesStateMachine: Sendable { + /// State machine representing the state of the ``KafkaConsumer``. + struct StateMachine: Sendable { // TODO: revisit docc - enum State { + enum ConsumerMessagesState { + /// The sequence can take more messages. + /// + /// - Parameter source: The source for yielding new messages. + case running(source: Producer.Source) + /// Sequence suspended due to back pressure. + /// + /// - Parameter source: The source for yielding new messages. + case suspended(source: Producer.Source) + // TODO: docc read to end and now waiting for new message + case waitingForMessages(source: Producer.Source) + /// The sequence has finished, and no more messages will be produced. + case finished + } + + /// The state of the ``StateMachine``. + enum State: Sendable { /// The state machine has been initialized with init() but is not yet Initialized /// using `func initialize()` (required). case uninitialized @@ -557,31 +557,20 @@ extension KafkaConsumer { client: RDKafkaClient, source: Producer.Source ) - /// The sequence can take more messages. + /// The ``KafkaConsumer`` is consuming messages. /// - /// - Parameter client: TODO - /// - Parameter source: The source for yielding new messages. - case running( - client: RDKafkaClient, - source: Producer.Source - ) - /// Sequence suspended due to back pressure. + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + case running(client: RDKafkaClient, state: ConsumerMessagesState) + /// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked. + /// We are now in the process of commiting our last state to the broker. /// - /// - Parameter source: The source for yielding new messages. - case suspended( - client: RDKafkaClient, - source: Producer.Source - ) - // TODO: docc read to end and now waiting for new message - case waitingForMessages( - client: RDKafkaClient, - source: Producer.Source - ) - /// The sequence has finished, and no more messages will be produced. + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + case finishing(client: RDKafkaClient) + /// The ``KafkaConsumer`` is closed. case finished } - // TODO: docc + /// The current state of the StateMachine. var state: State = .uninitialized /// Delayed initialization of `StateMachine` as the `source` and the `pollClosure` are @@ -599,21 +588,14 @@ extension KafkaConsumer { ) } + // TODO: general: group events + /// Action to be taken when wanting to poll for a new message. - enum PollLoopAction { - /// Poll for a new ``KafkaConsumerMessage``. + enum PollLoopAction { // TODO: eventPollLoop + /// Serve any queued callbacks on the event queue. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case pollForAndYieldMessage( - client: RDKafkaClient, - source: Producer.Source - ) - case pollForMessageAndSleep( - client: RDKafkaClient, - source: Producer.Source - ) - case suspend // TODO: sleep for poll interval + case pollForEvents(client: RDKafkaClient) /// Terminate the poll loop. case terminatePollLoop } @@ -622,151 +604,41 @@ extension KafkaConsumer { /// - Returns: The next action to be taken when wanting to poll, or `nil` if there is no action to be taken. /// /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. - mutating func nextPollLoopAction() -> PollLoopAction { + mutating func nextPollLoopAction() -> PollLoopAction { // TODO: eventPollLoop switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .running(let client, let source): - return .pollForAndYieldMessage(client: client, source: source) - case .suspended: - return .suspend - case .waitingForMessages(let client, let source): - return .pollForMessageAndSleep(client: client, source: source) + case .running(let client, _): + return .pollForEvents(client: client) + case .finishing(let client): + if client.isConsumerClosed { + self.state = .finished + return .terminatePollLoop + } else { + return .pollForEvents(client: client) + } case .finished: return .terminatePollLoop } } - mutating func setUpConnection() { - switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing(let client, let source): - self.state = .running(client: client, source: source) - case .running, .suspended, .waitingForMessages, .finished: - fatalError("\(#function) should only be invoked upon initialization of KafkaConsumer") - } - } - - // TODO: there are new messages to read - mutating func newMessagesProduced() { - switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing: - fatalError("Call to \(#function) before setUpConnection() was invoked") - case .running: - fatalError("\(#function) should not be invoked in state \(self.state)") - case .suspended: - fatalError("\(#function) should not be invoked in state \(self.state)") - case .waitingForMessages(let client, let source): - self.state = .running(client: client, source: source) - case .finished: - fatalError("finish() must not be invoked more than once") - } - } - - mutating func waitForNewMessages() { - switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing: - fatalError("Call to \(#function) before setUpConnection() was invoked") - case .running(let client, let source): - self.state = .waitingForMessages(client: client, source: source) - case .suspended: - fatalError("\(#function) should not be invoked in state \(self.state)") - case .waitingForMessages: - fatalError("\(#function) should not be invoked in state \(self.state)") - case .finished: - fatalError("finish() must not be invoked more than once") - } - } - - mutating func finish() { - switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing: - fatalError("Call to \(#function) before setUpConnection() was invoked") - case .running, .suspended, .waitingForMessages: - self.state = .finished - case .finished: - break - } - } - - /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to produce more messages. - mutating func produceMore() { - // TODO: group cases - switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing: - break // This case can be triggered by `NIOAsyncSequenceProducerDelegate`, ignore - case .running: - break - case .suspended(let client, let source): - self.state = .running(client: client, source: source) - case .waitingForMessages: - break - case .finished: - break - } - } - - /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to temporarily stop producing messages. - mutating func stopProducing() { - switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing: - fatalError("Call to \(#function) before setUpConnection() was invoked") - case .running(let client, let source): - self.state = .suspended(client: client, source: source) - case .suspended: - break - case .waitingForMessages(let client, let source): - self.state = .suspended(client: client, source: source) - case .finished: - break - } - } - } - - /// State machine representing the state of the ``KafkaConsumer``. - struct StateMachine: Sendable { - /// The state of the ``StateMachine``. - enum State: Sendable { - /// The ``KafkaConsumer`` is consuming messages. - /// - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case running(client: RDKafkaClient) - /// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked. - /// We are now in the process of commiting our last state to the broker. - /// - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case finishing(client: RDKafkaClient) - /// The ``KafkaConsumer`` is closed. - case finished - } - - /// The current state of the StateMachine. - var state: State - - // TODO: docc - init(client: RDKafkaClient) { - self.state = .running(client: client) - } - /// Action to be taken when wanting to poll for a new message. - enum PollLoopAction { - /// Serve any queued callbacks on the event queue. + enum ConsumerPollLoopAction { + /// Poll for a new ``KafkaConsumerMessage``. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case pollForEvents(client: RDKafkaClient) + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + case pollForAndYieldMessage( + client: RDKafkaClient, + source: Producer.Source + ) + case pollForMessageAndSleep( + client: RDKafkaClient, + source: Producer.Source + ) + case suspend // TODO: sleep for poll interval /// Terminate the poll loop. case terminatePollLoop } @@ -775,17 +647,29 @@ extension KafkaConsumer { /// - Returns: The next action to be taken when wanting to poll, or `nil` if there is no action to be taken. /// /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. - mutating func nextPollLoopAction() -> PollLoopAction { - switch self.state { - case .running(let client): - return .pollForEvents(client: client) - case .finishing(let client): - if client.isConsumerClosed { - self.state = .finished - return .terminatePollLoop - } else { - return .pollForEvents(client: client) - } + mutating func nextConsumerPollLoopAction() -> ConsumerPollLoopAction { + // TODO: refactor + if case .finishing = self.state { + return .terminatePollLoop + } + // TODO: refactor + if case .finished = self.state { + return .terminatePollLoop + } + + guard case let .running(client, consumerState) = self.state else { + fatalError("\(#function) invoked while still in state \(self.state)") + } + + // TODO: terminate poll loop if consumer is finished + + switch consumerState { + case .running(let source): + return .pollForAndYieldMessage(client: client, source: source) + case .suspended: + return .suspend + case .waitingForMessages(let source): + return .pollForMessageAndSleep(client: client, source: source) case .finished: return .terminatePollLoop } @@ -803,8 +687,13 @@ extension KafkaConsumer { /// - Returns: The action to be taken. mutating func setUpConnection() -> SetUpConnectionAction { switch self.state { - case .running(let client): + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .initializing(let client, let source): + self.state = .running(client: client, state: .running(source: source)) return .setUpConnection(client: client) + case .running: + fatalError("\(#function) should not be invoked more than once") case .finishing, .finished: fatalError("\(#function) should only be invoked when KafkaConsumer is running") } @@ -823,7 +712,11 @@ extension KafkaConsumer { /// Get action to take when wanting to store a message offset (to be auto-committed by `librdkafka`). func storeOffset() -> StoreOffsetAction { switch self.state { - case .running(let client): + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .initializing: + fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") + case .running(let client, _): return .storeOffset(client: client) case .finishing, .finished: return .terminateConsumerSequence @@ -846,13 +739,35 @@ extension KafkaConsumer { /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. func commit() -> CommitAction { switch self.state { - case .running(let client): + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .initializing: + fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") + case .running(let client, _): return .commit(client: client) case .finishing, .finished: return .throwClosedError } } + // TODO: docc + move + mutating func finishMessageConsumption() { + // TODO: refactor + if case .finishing = self.state { + return + } + // TODO: refactor + if case .finished = self.state { + return + } + + guard case let .running(client, _) = self.state else { + fatalError("\(#function) invoked while still in state \(self.state)") + } + + self.state = .running(client: client, state: .finished) + } + /// Action to be taken when wanting to do close the consumer. enum FinishAction { /// Shut down the ``KafkaConsumer``. @@ -867,12 +782,96 @@ extension KafkaConsumer { /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. mutating func finish() -> FinishAction? { switch self.state { - case .running(let client): + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .initializing: + fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") + case .running(let client, _): self.state = .finishing(client: client) return .triggerGracefulShutdown(client: client) case .finishing, .finished: return nil } } + + // MARK: - Consumer Messages Loop Actions + + // TODO: docc there are new messages to read + mutating func newMessagesProduced() { + guard case let .running(client, consumerState) = self.state else { + fatalError("\(#function) invoked while still in state \(self.state)") + } + + switch consumerState { + case .running: + fatalError("\(#function) should not be invoked in state \(self.state)") + case .suspended: + fatalError("\(#function) should not be invoked in state \(self.state)") + case .waitingForMessages(let source): + self.state = .running(client: client, state: .running(source: source)) // TODO: refactor + case .finished: + fatalError("finish() must not be invoked more than once") + } + } + + mutating func waitForNewMessages() { + guard case let .running(client, consumerState) = self.state else { + fatalError("\(#function) invoked while still in state \(self.state)") + } + + switch consumerState { + case .running(let source): + self.state = .running(client: client, state: .waitingForMessages(source: source)) + case .suspended: + fatalError("\(#function) should not be invoked in state \(self.state)") + case .waitingForMessages: + fatalError("\(#function) should not be invoked in state \(self.state)") + case .finished: + fatalError("finish() must not be invoked more than once") + } + } + + /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to produce more messages. + mutating func produceMore() { + // TODO: refactor + if case .initializing = self.state { + return + } + + guard case let .running(client, consumerState) = self.state else { + fatalError("\(#function) invoked while still in state \(self.state)") + } + + // TODO: group cases + switch consumerState { + case .running: + break + case .suspended(let source): + self.state = .running(client: client, state: .running(source: source)) + case .waitingForMessages: + break + case .finished: // TODO: do we need that? + break + } + } + + /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to temporarily stop producing messages. + mutating func stopProducing() { + guard case let .running(client, consumerState) = self.state else { + fatalError("\(#function) invoked while still in state \(self.state)") + } + + switch consumerState { + case .running(let source): + self.state = .running(client: client, state: .suspended(source: source)) + case .suspended: + break + case .waitingForMessages(let source): + self.state = .running(client: client, state: .suspended(source: source)) + case .finished: + break + } + } + } } From 6686e982c3c1c905acc0a6f4653cdf903f6738ff Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Sun, 15 Oct 2023 23:57:48 +0100 Subject: [PATCH 4/5] Refactor + DocC --- Sources/Kafka/KafkaConsumer.swift | 311 ++++++++++++------------ Tests/IntegrationTests/KafkaTests.swift | 50 ---- 2 files changed, 150 insertions(+), 211 deletions(-) diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index 53c0f9a9..a76ef1f6 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -352,81 +352,13 @@ public final class KafkaConsumer: Sendable, Service { try self.subscribe(topics: topics) } - // TODO: refactor, smaller functions try await withThrowingTaskGroup(of: Void.self) { group in - // Run loop for events group.addTask { - while !Task.isCancelled { - let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() } - switch nextAction { - case .pollForEvents(let client): - // Event poll to serve any events queued inside of `librdkafka`. - _ = client.eventPoll() - try await Task.sleep(for: self.configuration.pollInterval) - case .terminatePollLoop: - return - } - } + try await self.eventRunLoop() } - // Run loop for consumer messages group.addTask { - while !Task.isCancelled { - let nextAction = self.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() } - switch nextAction { - case .pollForAndYieldMessage(let client, let source): - // Poll for new consumer message. - var result: Result? - do { - if let message = try client.consumerPoll() { - result = .success(message) - } - } catch { - result = .failure(error) - } - if let result { - let yieldResult = source.yield(result) - switch yieldResult { - case .produceMore: - break - case .stopProducing: - self.stateMachine.withLockedValue { $0.stopProducing() } - case .dropped: - break - } - } else { - self.stateMachine.withLockedValue { $0.waitForNewMessages() } - } - case .pollForMessageAndSleep(let client, let source): - var result: Result? - do { - if let message = try client.consumerPoll() { - result = .success(message) - } - } catch { - result = .failure(error) - } - if let result { - self.stateMachine.withLockedValue { $0.produceMore() } - let yieldResult = source.yield(result) - switch yieldResult { - case .produceMore: - break - case .stopProducing: - self.stateMachine.withLockedValue { $0.stopProducing() } - case .dropped: - break - } - - self.stateMachine.withLockedValue { $0.newMessagesProduced() } - } - try await Task.sleep(for: self.configuration.pollInterval) - case .suspend: - try await Task.sleep(for: self.configuration.pollInterval) - case .terminatePollLoop: - return - } - } + try await self.messageRunLoop() } // Throw when one of the two child task throws @@ -435,6 +367,81 @@ public final class KafkaConsumer: Sendable, Service { } } + /// Run loop polling Kafka for new events. + private func eventRunLoop() async throws { + while !Task.isCancelled { + let nextAction = self.stateMachine.withLockedValue { $0.nextEventPollLoopAction() } + switch nextAction { + case .pollForEvents(let client): + // Event poll to serve any events queued inside of `librdkafka`. + _ = client.eventPoll() + try await Task.sleep(for: self.configuration.pollInterval) + case .terminatePollLoop: + return + } + } + } + + /// Run loop polling Kafka for new consumer messages. + private func messageRunLoop() async throws { + while !Task.isCancelled { + let nextAction = self.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() } + switch nextAction { + case .pollForAndYieldMessage(let client, let source): + // Poll for new consumer message. + var result: Result? + do { + if let message = try client.consumerPoll() { + result = .success(message) + } + } catch { + result = .failure(error) + } + if let result { + let yieldResult = source.yield(result) + switch yieldResult { + case .produceMore: + break + case .stopProducing: + self.stateMachine.withLockedValue { $0.stopProducing() } + case .dropped: + break + } + } else { + self.stateMachine.withLockedValue { $0.waitForNewMessages() } + } + case .pollForMessageAndSleep(let client, let source): + var result: Result? + do { + if let message = try client.consumerPoll() { + result = .success(message) + } + } catch { + result = .failure(error) + } + if let result { + self.stateMachine.withLockedValue { $0.produceMore() } + let yieldResult = source.yield(result) + switch yieldResult { + case .produceMore: + break + case .stopProducing: + self.stateMachine.withLockedValue { $0.stopProducing() } + case .dropped: + break + } + + self.stateMachine.withLockedValue { $0.newMessagesProduced() } + } + try await Task.sleep(for: self.configuration.pollInterval) + case .suspendPollLoop: + try await Task.sleep(for: self.configuration.pollInterval) + case .terminatePollLoop: + return + } + } + } + /// Mark all messages up to the passed message in the topic as read. /// Schedules a commit and returns immediately. /// Any errors encountered after scheduling the commit will be discarded. @@ -527,8 +534,8 @@ public final class KafkaConsumer: Sendable, Service { extension KafkaConsumer { /// State machine representing the state of the ``KafkaConsumer``. struct StateMachine: Sendable { - // TODO: revisit docc - enum ConsumerMessagesState { + /// State of the event loop fetching new consumer messages. + enum MessagePollLoopState { /// The sequence can take more messages. /// /// - Parameter source: The source for yielding new messages. @@ -537,7 +544,10 @@ extension KafkaConsumer { /// /// - Parameter source: The source for yielding new messages. case suspended(source: Producer.Source) - // TODO: docc read to end and now waiting for new message + /// We have read to the end of a partition and are now waiting for new messages + /// to be produced. + /// + /// - Parameter source: The source for yielding new messages. case waitingForMessages(source: Producer.Source) /// The sequence has finished, and no more messages will be produced. case finished @@ -552,7 +562,7 @@ extension KafkaConsumer { /// though ``subscribe()`` / ``assign()`` have not been invoked. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - // TODO: paramter source + /// - Parameter source: The source for yielding new messages. case initializing( client: RDKafkaClient, source: Producer.Source @@ -560,7 +570,8 @@ extension KafkaConsumer { /// The ``KafkaConsumer`` is consuming messages. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case running(client: RDKafkaClient, state: ConsumerMessagesState) + /// - Parameter state: State of the event loop fetching new consumer messages. + case running(client: RDKafkaClient, messagePollLoopState: MessagePollLoopState) /// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked. /// We are now in the process of commiting our last state to the broker. /// @@ -588,10 +599,8 @@ extension KafkaConsumer { ) } - // TODO: general: group events - /// Action to be taken when wanting to poll for a new message. - enum PollLoopAction { // TODO: eventPollLoop + enum EventPollLoopAction { /// Serve any queued callbacks on the event queue. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. @@ -604,7 +613,7 @@ extension KafkaConsumer { /// - Returns: The next action to be taken when wanting to poll, or `nil` if there is no action to be taken. /// /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. - mutating func nextPollLoopAction() -> PollLoopAction { // TODO: eventPollLoop + mutating func nextEventPollLoopAction() -> EventPollLoopAction { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") @@ -634,11 +643,16 @@ extension KafkaConsumer { client: RDKafkaClient, source: Producer.Source ) + /// Poll for a new ``KafkaConsumerMessage`` and sleep for ``KafkaConsumerConfiguration/pollInterval``. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. case pollForMessageAndSleep( client: RDKafkaClient, source: Producer.Source ) - case suspend // TODO: sleep for poll interval + /// Sleep for ``KafkaConsumerConfiguration/pollInterval``. + case suspendPollLoop /// Terminate the poll loop. case terminatePollLoop } @@ -648,29 +662,23 @@ extension KafkaConsumer { /// /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. mutating func nextConsumerPollLoopAction() -> ConsumerPollLoopAction { - // TODO: refactor - if case .finishing = self.state { - return .terminatePollLoop - } - // TODO: refactor - if case .finished = self.state { - return .terminatePollLoop - } - - guard case let .running(client, consumerState) = self.state else { + switch self.state { + case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") - } - - // TODO: terminate poll loop if consumer is finished - - switch consumerState { - case .running(let source): - return .pollForAndYieldMessage(client: client, source: source) - case .suspended: - return .suspend - case .waitingForMessages(let source): - return .pollForMessageAndSleep(client: client, source: source) - case .finished: + case .initializing: + fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") + case .running(let client, let consumerState): + switch consumerState { + case .running(let source): + return .pollForAndYieldMessage(client: client, source: source) + case .suspended(source: _): + return .suspendPollLoop + case .waitingForMessages(let source): + return .pollForMessageAndSleep(client: client, source: source) + case .finished: + return .terminatePollLoop + } + case .finishing, .finished: return .terminatePollLoop } } @@ -690,7 +698,7 @@ extension KafkaConsumer { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .initializing(let client, let source): - self.state = .running(client: client, state: .running(source: source)) + self.state = .running(client: client, messagePollLoopState: .running(source: source)) return .setUpConnection(client: client) case .running: fatalError("\(#function) should not be invoked more than once") @@ -750,24 +758,6 @@ extension KafkaConsumer { } } - // TODO: docc + move - mutating func finishMessageConsumption() { - // TODO: refactor - if case .finishing = self.state { - return - } - // TODO: refactor - if case .finished = self.state { - return - } - - guard case let .running(client, _) = self.state else { - fatalError("\(#function) invoked while still in state \(self.state)") - } - - self.state = .running(client: client, state: .finished) - } - /// Action to be taken when wanting to do close the consumer. enum FinishAction { /// Shut down the ``KafkaConsumer``. @@ -794,84 +784,83 @@ extension KafkaConsumer { } } - // MARK: - Consumer Messages Loop Actions + // MARK: - Consumer Messages Poll Loop Actions - // TODO: docc there are new messages to read + /// The partition that was previously finished reading has got new messages produced to it. mutating func newMessagesProduced() { - guard case let .running(client, consumerState) = self.state else { + guard case .running(let client, let consumerState) = self.state else { fatalError("\(#function) invoked while still in state \(self.state)") } switch consumerState { - case .running: - fatalError("\(#function) should not be invoked in state \(self.state)") - case .suspended: + case .running, .suspended, .finished: fatalError("\(#function) should not be invoked in state \(self.state)") case .waitingForMessages(let source): - self.state = .running(client: client, state: .running(source: source)) // TODO: refactor - case .finished: - fatalError("finish() must not be invoked more than once") + self.state = .running(client: client, messagePollLoopState: .running(source: source)) } } + /// The consumer has read to the end of a partition and shall now go into a sleep loop until new messages are produced. mutating func waitForNewMessages() { - guard case let .running(client, consumerState) = self.state else { + guard case .running(let client, let consumerState) = self.state else { fatalError("\(#function) invoked while still in state \(self.state)") } switch consumerState { case .running(let source): - self.state = .running(client: client, state: .waitingForMessages(source: source)) - case .suspended: - fatalError("\(#function) should not be invoked in state \(self.state)") - case .waitingForMessages: + self.state = .running(client: client, messagePollLoopState: .waitingForMessages(source: source)) + case .suspended, .waitingForMessages, .finished: fatalError("\(#function) should not be invoked in state \(self.state)") - case .finished: - fatalError("finish() must not be invoked more than once") } } /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to produce more messages. mutating func produceMore() { - // TODO: refactor - if case .initializing = self.state { - return - } - - guard case let .running(client, consumerState) = self.state else { + switch self.state { + case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") - } - - // TODO: group cases - switch consumerState { - case .running: - break - case .suspended(let source): - self.state = .running(client: client, state: .running(source: source)) - case .waitingForMessages: - break - case .finished: // TODO: do we need that? + case .initializing: + break // This case can be triggered by the KafkaConsumerMessagesDeletgate + case .running(let client, let consumerState): + switch consumerState { + case .running, .waitingForMessages, .finished: + break + case .suspended(let source): + self.state = .running(client: client, messagePollLoopState: .running(source: source)) + } + case .finishing, .finished: break } } /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to temporarily stop producing messages. mutating func stopProducing() { - guard case let .running(client, consumerState) = self.state else { + guard case .running(let client, let consumerState) = self.state else { fatalError("\(#function) invoked while still in state \(self.state)") } switch consumerState { - case .running(let source): - self.state = .running(client: client, state: .suspended(source: source)) - case .suspended: + case .suspended, .finished: break + case .running(let source): + self.state = .running(client: client, messagePollLoopState: .suspended(source: source)) case .waitingForMessages(let source): - self.state = .running(client: client, state: .suspended(source: source)) - case .finished: - break + self.state = .running(client: client, messagePollLoopState: .suspended(source: source)) } } + /// The ``KafkaConsumerMessages`` asynchronous sequence was terminated. + mutating func finishMessageConsumption() { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .initializing: + fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") + case .running(let client, _): + self.state = .running(client: client, messagePollLoopState: .finished) + case .finishing, .finished: + break + } + } } } diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift index c8f98212..e6cf82e5 100644 --- a/Tests/IntegrationTests/KafkaTests.swift +++ b/Tests/IntegrationTests/KafkaTests.swift @@ -599,56 +599,6 @@ final class KafkaTests: XCTestCase { } } - // TODO: remove - func testDelete() async throws { - var consumerConfig = KafkaConsumerConfiguration( - consumptionStrategy: .partition( - KafkaPartition(rawValue: 0), - topic: "test-topic", - offset: KafkaOffset(rawValue: 0) // Important: Read from beginning! - ), - bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] - ) - consumerConfig.pollInterval = .milliseconds(100) - consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning - consumerConfig.broker.addressFamily = .v4 - - let consumer = try KafkaConsumer( - configuration: consumerConfig, - logger: .kafkaTest - ) - - let serviceGroupConfiguration = ServiceGroupConfiguration(services: [consumer], logger: .kafkaTest) - let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) - - try await withThrowingTaskGroup(of: Void.self) { group in - // Run Task - group.addTask { - try await serviceGroup.run() - } - - // Consumer Task - group.addTask { - var count = 0 - for try await message in consumer.messages { - _ = message // drop message - count += 1 -// try await Task.sleep(for: .milliseconds(1)) - if count % 1000 == 0 { - print(count) - } - if count == 1_000_000 { - break - } - } - } - - // Wait for Consumer Task to complete - try await group.next() - // Shutdown the serviceGroup - await serviceGroup.triggerGracefulShutdown() - } - } // MARK: - Helpers private static func createTestMessages( From 5850674ca700a6d53b795d7e294d897e936548cd Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Tue, 17 Oct 2023 00:55:09 +0100 Subject: [PATCH 5/5] Review Franz + Blindspot Modifications: * `KafkaConsumer`: * end consumer message poll loop when async sequence drops message * do not sleep if we picked up reading new messages again after we finished reading a partition * `messageRunLoop`: * fix `fatalError` where `newMessagesProduced()` is invoked after `stopProducing()` * add func `batchConsumerPoll` that reads a batch of messages to avoid acquiring the lock in `messageRunLoop` too often --- Sources/Kafka/KafkaConsumer.swift | 89 ++++++++++++++++++------------- 1 file changed, 53 insertions(+), 36 deletions(-) diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index a76ef1f6..1163f862 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -387,53 +387,39 @@ public final class KafkaConsumer: Sendable, Service { while !Task.isCancelled { let nextAction = self.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() } switch nextAction { - case .pollForAndYieldMessage(let client, let source): - // Poll for new consumer message. - var result: Result? - do { - if let message = try client.consumerPoll() { - result = .success(message) - } - } catch { - result = .failure(error) - } - if let result { - let yieldResult = source.yield(result) + case .pollForAndYieldMessages(let client, let source): + // Poll for new consumer messages. + let messageResults = self.batchConsumerPoll(client: client) + if messageResults.isEmpty { + self.stateMachine.withLockedValue { $0.waitForNewMessages() } + } else { + let yieldResult = source.yield(contentsOf: messageResults) switch yieldResult { case .produceMore: break case .stopProducing: self.stateMachine.withLockedValue { $0.stopProducing() } case .dropped: - break + return } - } else { - self.stateMachine.withLockedValue { $0.waitForNewMessages() } } - case .pollForMessageAndSleep(let client, let source): - var result: Result? - do { - if let message = try client.consumerPoll() { - result = .success(message) - } - } catch { - result = .failure(error) - } - if let result { - self.stateMachine.withLockedValue { $0.produceMore() } - let yieldResult = source.yield(result) + case .pollForMessagesIfAvailable(let client, let source): + let messageResults = self.batchConsumerPoll(client: client) + if messageResults.isEmpty { + // Still no new messages, so sleep. + try await Task.sleep(for: self.configuration.pollInterval) + } else { + // New messages were produced to the partition that we previously finished reading. + let yieldResult = source.yield(contentsOf: messageResults) switch yieldResult { case .produceMore: break case .stopProducing: self.stateMachine.withLockedValue { $0.stopProducing() } case .dropped: - break + return } - - self.stateMachine.withLockedValue { $0.newMessagesProduced() } } - try await Task.sleep(for: self.configuration.pollInterval) case .suspendPollLoop: try await Task.sleep(for: self.configuration.pollInterval) case .terminatePollLoop: @@ -442,6 +428,36 @@ public final class KafkaConsumer: Sendable, Service { } } + /// Read `maxMessages` consumer messages from Kafka. + /// + /// - Parameters: + /// - client: Client used for handling the connection to the Kafka cluster. + /// - maxMessages: Maximum amount of consumer messages to read in this invocation. + private func batchConsumerPoll( + client: RDKafkaClient, + maxMessages: Int = 100 + ) -> [Result] { + var messageResults = [Result]() + messageResults.reserveCapacity(maxMessages) + + for _ in 0..? + do { + if let message = try client.consumerPoll() { + result = .success(message) + } + } catch { + result = .failure(error) + } + + if let result { + messageResults.append(result) + } + } + + return messageResults + } + /// Mark all messages up to the passed message in the topic as read. /// Schedules a commit and returns immediately. /// Any errors encountered after scheduling the commit will be discarded. @@ -639,15 +655,16 @@ extension KafkaConsumer { /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case pollForAndYieldMessage( + case pollForAndYieldMessages( client: RDKafkaClient, source: Producer.Source ) - /// Poll for a new ``KafkaConsumerMessage`` and sleep for ``KafkaConsumerConfiguration/pollInterval``. + /// Poll for a new ``KafkaConsumerMessage`` or sleep for ``KafkaConsumerConfiguration/pollInterval`` + /// if there are no new messages to read from the partition. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case pollForMessageAndSleep( + case pollForMessagesIfAvailable( client: RDKafkaClient, source: Producer.Source ) @@ -670,11 +687,11 @@ extension KafkaConsumer { case .running(let client, let consumerState): switch consumerState { case .running(let source): - return .pollForAndYieldMessage(client: client, source: source) + return .pollForAndYieldMessages(client: client, source: source) case .suspended(source: _): return .suspendPollLoop case .waitingForMessages(let source): - return .pollForMessageAndSleep(client: client, source: source) + return .pollForMessagesIfAvailable(client: client, source: source) case .finished: return .terminatePollLoop }