From 5ed295fbc723c2d36c72744f4dac70d0871114f7 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Mon, 3 Jul 2023 10:35:40 +0100 Subject: [PATCH] Refactor `KafkaProducer` (#67) * Refactor `KafkaProducer` Motiviation: * align `KafkaProducer` more with proposed changes to `KafkaConsumer` * `AsyncStream` was not handling `AsyncSequence` termination handling as we wanted it to, so revert back to use `NIOAsyncSequenceProducer` Modifications: * make `KafkaProducer` `final class` instead of `actor` * `KafkaProducer`: use `NIOAsyncSequenceProducer` instead of `AsyncSequence` for better termination handling -> shutdown `KafkaProducer` on termination of the `AsyncSequence` * introduce `StateMachine` to `KafkaProducer` * move internal state of `KafkaProducer` to `KafkaProducer.StateMachine` * remove unused `await` expressions when accessing `KafkaProducer` * update tests * update `README` * * rename KafkaProducer.StateMachine.State.shutDown to .finished * Remove unused awaits * KafkaProducer: move logger out of state * KafkaProducer: rename `killPollLoop` -> `terminatePollLoop` * Fix errors after rebase Modifications: * move `NoBackPressure` struct to `extension` of `NIOAsyncSequenceProducerBackPressureStrategies` * break down duplicate `ShutDownOnTerminate` type into two more specialised types for `KafkaConsumer` and `KafkaProducer` * add missing `config` parameter to `KafkaProducer`'s initialiser * Create wrapper for Kafka topic handle dict Modifications: * create new class `RDKafkaTopicHandles` that wraps a dictionary containing all topic names with their respective `rd_kafka_topic_t` handles * create method `KafkaClient.produce` wrapping the `rd_kafka_produce` method in a Swift way * Own implementation of `rd_kafka_flush()` Modifications: * `KafkaClient`: add new property `outgoingQueueSize` * `KafkaProducer.StateMachine`: add new state `.flushing` * `KafkaProducer.shutdownGracefully()`: * make non-async * remove invocation to `rd_kafka_flush` * set state to `KafkaProducer.StateMachine.State` to `.flushing` * `KafkaProducer` poll loop: * poll as long as `outgoingQueueSize` is > 0 to send out any enqueued `KafkaProducerMessage`s and serve any enqueued callbacks * `KafkaProducerTests`: add test asserting that the `librdkafka` `outq` is still being served after `KafkaProducer.shutdownGracefully` has been invoked as long as there are enqueued items * Review Franz Modifications: * rename `KafkaProducer.shutdownGracefully` to `KafkaProducer.triggerGracefulShutdown` * `KafkaProducer.send` separate error message when in state `.flushing` --- README.md | 6 +- Sources/SwiftKafka/KafkaClient.swift | 50 +++ Sources/SwiftKafka/KafkaConsumer.swift | 25 +- Sources/SwiftKafka/KafkaProducer.swift | 365 +++++++++++------- .../RDKafka/RDKafkaTopicHandles.swift | 79 ++++ ...ackPressureStrategies+NoBackPressure.swift | 23 ++ Tests/IntegrationTests/SwiftKafkaTests.swift | 14 +- .../SwiftKafkaTests/KafkaProducerTests.swift | 69 +++- 8 files changed, 455 insertions(+), 176 deletions(-) create mode 100644 Sources/SwiftKafka/RDKafka/RDKafkaTopicHandles.swift create mode 100644 Sources/SwiftKafka/Utilities/NIOAsyncSequenceProducerBackPressureStrategies+NoBackPressure.swift diff --git a/README.md b/README.md index 82a00424..3ba669d7 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ The `send(_:)` method of `KafkaProducer` returns a message-id that can later be ```swift let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"]) -let (producer, acknowledgements) = try await KafkaProducer.makeProducerWithAcknowledgements( +let (producer, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements( config: config, logger: .kafkaTest // Your logger here ) @@ -44,7 +44,7 @@ await withThrowingTaskGroup(of: Void.self) { group in // Task receiving acknowledgements group.addTask { - let messageID = try await producer.send( + let messageID = try producer.send( KafkaProducerMessage( topic: "topic-name", value: "Hello, World!" @@ -56,7 +56,7 @@ await withThrowingTaskGroup(of: Void.self) { group in } // Required - await producer.shutdownGracefully() + producer.triggerGracefulShutdown() } } ``` diff --git a/Sources/SwiftKafka/KafkaClient.swift b/Sources/SwiftKafka/KafkaClient.swift index 1dfb6876..ca5f26be 100644 --- a/Sources/SwiftKafka/KafkaClient.swift +++ b/Sources/SwiftKafka/KafkaClient.swift @@ -42,6 +42,47 @@ final class KafkaClient { rd_kafka_destroy(kafkaHandle) } + /// Produce a message to the Kafka cluster. + /// + /// - Parameter message: The ``KafkaProducerMessage`` that is sent to the KafkaCluster. + /// - Parameter newMessageID: ID that was assigned to the `message`. + /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics. + /// - Parameter topicHandles: Topic handles that this client uses to produce new messages + func produce( + message: KafkaProducerMessage, + newMessageID: UInt, + topicConfig: KafkaTopicConfiguration, + topicHandles: RDKafkaTopicHandles + ) throws { + let keyBytes: [UInt8]? + if var key = message.key { + keyBytes = key.readBytes(length: key.readableBytes) + } else { + keyBytes = nil + } + + let responseCode = try message.value.withUnsafeReadableBytes { valueBuffer in + return try topicHandles.withTopicHandlePointer(topic: message.topic, topicConfig: topicConfig) { topicHandle in + // Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster. + // Returns 0 on success, error code otherwise. + return rd_kafka_produce( + topicHandle, + message.partition.rawValue, + RD_KAFKA_MSG_F_COPY, + UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress), + valueBuffer.count, + keyBytes, + keyBytes?.count ?? 0, + UnsafeMutableRawPointer(bitPattern: newMessageID) + ) + } + } + + guard responseCode == 0 else { + throw KafkaError.rdKafkaError(wrapping: rd_kafka_last_error()) + } + } + /// Polls the Kafka client for events. /// /// Events will cause application-provided callbacks to be called. @@ -210,10 +251,19 @@ final class KafkaClient { } } + /// Returns `true` if the underlying `librdkafka` consumer is closed. var isConsumerClosed: Bool { rd_kafka_consumer_closed(self.kafkaHandle) == 1 } + /// Returns the current out queue length. + /// + /// This means the number of producer messages that wait to be sent + the number of any + /// callbacks that are waiting to be executed by invoking `rd_kafka_poll`. + var outgoingQueueSize: Int32 { + return rd_kafka_outq_len(self.kafkaHandle) + } + /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle. /// - Warning: Do not escape the pointer from the closure for later use. /// - Parameter body: The closure will use the Kafka handle pointer. diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index c039c65c..3fd9bc4f 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -17,23 +17,15 @@ import Logging import NIOConcurrencyHelpers import NIOCore -// MARK: - NoBackPressure - -/// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true. -struct NoBackPressure: NIOAsyncSequenceProducerBackPressureStrategy { - func didYield(bufferDepth: Int) -> Bool { true } - func didConsume(bufferDepth: Int) -> Bool { true } -} - -// MARK: - ShutDownOnTerminate +// MARK: - KafkaConsumerShutDownOnTerminate /// `NIOAsyncSequenceProducerDelegate` that terminates the shuts the consumer down when /// `didTerminate()` is invoked. -struct ShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock +internal struct KafkaConsumerShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock let stateMachine: NIOLockedValueBox } -extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate { +extension KafkaConsumerShutdownOnTerminate: NIOAsyncSequenceProducerDelegate { func produceMore() { // No back pressure return @@ -68,7 +60,8 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate { /// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). public struct KafkaConsumerMessages: AsyncSequence { public typealias Element = KafkaConsumerMessage - typealias WrappedSequence = NIOAsyncSequenceProducer + typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure + typealias WrappedSequence = NIOAsyncSequenceProducer let wrappedSequence: WrappedSequence /// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). @@ -91,8 +84,8 @@ public struct KafkaConsumerMessages: AsyncSequence { public final class KafkaConsumer { typealias Producer = NIOAsyncSequenceProducer< KafkaConsumerMessage, - NoBackPressure, - ShutdownOnTerminate + NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure, + KafkaConsumerShutdownOnTerminate > /// The configuration object of the consumer client. private var config: KafkaConsumerConfiguration @@ -123,8 +116,8 @@ public final class KafkaConsumer { let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( elementType: KafkaConsumerMessage.self, - backPressureStrategy: NoBackPressure(), - delegate: ShutdownOnTerminate(stateMachine: self.stateMachine) + backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(), + delegate: KafkaConsumerShutdownOnTerminate(stateMachine: self.stateMachine) ) self.messages = KafkaConsumerMessages( diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift index 69c18782..e37a098d 100644 --- a/Sources/SwiftKafka/KafkaProducer.swift +++ b/Sources/SwiftKafka/KafkaProducer.swift @@ -14,16 +14,40 @@ import Crdkafka import Logging +import NIOConcurrencyHelpers +import NIOCore + +// MARK: - KafkaProducerShutdownOnTerminate + +/// `NIOAsyncSequenceProducerDelegate` that terminates the shuts the producer down when +/// `didTerminate()` is invoked. +internal struct KafkaProducerShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock + let stateMachine: NIOLockedValueBox +} + +extension KafkaProducerShutdownOnTerminate: NIOAsyncSequenceProducerDelegate { + func produceMore() { + // No back pressure + return + } + + func didTerminate() { + self.stateMachine.withLockedValue { $0.finish() } + } +} + +// MARK: - KafkaMessageAcknowledgements /// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``). public struct KafkaMessageAcknowledgements: AsyncSequence { public typealias Element = Result - typealias WrappedSequence = AsyncStream + typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure + typealias WrappedSequence = NIOAsyncSequenceProducer let wrappedSequence: WrappedSequence /// `AsynceIteratorProtocol` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``). public struct AcknowledgedMessagesAsyncIterator: AsyncIteratorProtocol { - var wrappedIterator: AsyncStream.AsyncIterator + var wrappedIterator: WrappedSequence.AsyncIterator public mutating func next() async -> Element? { await self.wrappedIterator.next() @@ -36,59 +60,39 @@ public struct KafkaMessageAcknowledgements: AsyncSequence { } /// Send messages to the Kafka cluster. -/// Please make sure to explicitly call ``shutdownGracefully(timeout:)`` when the ``KafkaProducer`` is not used anymore. +/// Please make sure to explicitly call ``triggerGracefulShutdown()`` when the ``KafkaProducer`` is not used anymore. /// - Note: When messages get published to a non-existent topic, a new topic is created using the ``KafkaTopicConfiguration`` /// configuration object (only works if server has `auto.create.topics.enable` property set). -public actor KafkaProducer { - /// States that the ``KafkaProducer`` can have. - private enum State { - /// The ``KafkaProducer`` has started and is ready to use. - case started - /// ``KafkaProducer/shutdownGracefully()`` has been invoked and the ``KafkaProducer`` - /// is in the process of receiving all outstanding acknowlegements and shutting down. - case shuttingDown - /// The ``KafkaProducer`` has been shut down and cannot be used anymore. - case shutDown - } +public final class KafkaProducer { + typealias Producer = NIOAsyncSequenceProducer< + Result, + NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure, + KafkaProducerShutdownOnTerminate + > /// State of the ``KafkaProducer``. - private var state: State + private let stateMachine: NIOLockedValueBox - /// Counter that is used to assign each message a unique ID. - /// Every time a new message is sent to the Kafka cluster, the counter is increased by one. - private var messageIDCounter: UInt = 0 /// The configuration object of the producer client. - private var config: KafkaProducerConfiguration - /// The ``TopicConfiguration`` used for newly created topics. + private let config: KafkaProducerConfiguration + /// Topic configuration that is used when a new topic has to be created by the producer. private let topicConfig: KafkaTopicConfiguration - /// A logger. - private let logger: Logger - /// Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer. - private var topicHandles: [String: OpaquePointer] - - /// Used for handling the connection to the Kafka cluster. - private let client: KafkaClient // Private initializer, use factory methods to create KafkaProducer /// Initialize a new ``KafkaProducer``. /// - /// - Parameter client: The ``KafkaClient`` instance associated with the ``KafkaProducer``. + /// - Parameter stateMachine: The ``KafkaProducer/StateMachine`` instance associated with the ``KafkaProducer``./// /// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``. /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics. - /// - Parameter logger: A logger. /// - Throws: A ``KafkaError`` if initializing the producer failed. private init( - client: KafkaClient, + stateMachine: NIOLockedValueBox, config: KafkaProducerConfiguration, - topicConfig: KafkaTopicConfiguration, - logger: Logger - ) async throws { - self.client = client + topicConfig: KafkaTopicConfiguration + ) throws { + self.stateMachine = stateMachine self.config = config self.topicConfig = topicConfig - self.topicHandles = [:] - self.logger = logger - self.state = .started } /// Initialize a new ``KafkaProducer``. @@ -104,7 +108,9 @@ public actor KafkaProducer { config: KafkaProducerConfiguration = KafkaProducerConfiguration(), topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(), logger: Logger - ) async throws -> KafkaProducer { + ) throws -> KafkaProducer { + let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) + let client = try RDKafka.createClient( type: .producer, configDictionary: config.dictionary, @@ -114,13 +120,19 @@ public actor KafkaProducer { logger: logger ) - let producer = try await KafkaProducer( - client: client, + let producer = try KafkaProducer( + stateMachine: stateMachine, config: config, - topicConfig: topicConfig, - logger: logger + topicConfig: topicConfig ) + stateMachine.withLockedValue { + $0.initialize( + client: client, + source: nil + ) + } + return producer } @@ -140,41 +152,45 @@ public actor KafkaProducer { config: KafkaProducerConfiguration = KafkaProducerConfiguration(), topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(), logger: Logger - ) async throws -> (KafkaProducer, KafkaMessageAcknowledgements) { - var streamContinuation: AsyncStream>.Continuation? - let stream = AsyncStream { continuation in - streamContinuation = continuation - } + ) throws -> (KafkaProducer, KafkaMessageAcknowledgements) { + let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) + + let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( + elementType: Result.self, + backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(), + delegate: KafkaProducerShutdownOnTerminate(stateMachine: stateMachine) + ) + let source = sourceAndSequence.source let client = try RDKafka.createClient( type: .producer, configDictionary: config.dictionary, - deliveryReportCallback: { [logger, streamContinuation] messageResult in + deliveryReportCallback: { [logger, source] messageResult in guard let messageResult else { logger.error("Could not resolve acknowledged message") return } // Ignore YieldResult as we don't support back pressure in KafkaProducer - streamContinuation?.yield(messageResult) + _ = source.yield(messageResult) }, logger: logger ) - let producer = try await KafkaProducer( - client: client, + let producer = try KafkaProducer( + stateMachine: stateMachine, config: config, - topicConfig: topicConfig, - logger: logger + topicConfig: topicConfig ) - streamContinuation?.onTermination = { [producer] _ in - Task { - await producer.shutdownGracefully() - } + stateMachine.withLockedValue { + $0.initialize( + client: client, + source: source + ) } - let acknowlegementsSequence = KafkaMessageAcknowledgements(wrappedSequence: stream) + let acknowlegementsSequence = KafkaMessageAcknowledgements(wrappedSequence: sourceAndSequence.sequence) return (producer, acknowlegementsSequence) } @@ -182,40 +198,26 @@ public actor KafkaProducer { /// /// This method flushes any buffered messages and waits until a callback is received for all of them. /// Afterwards, it shuts down the connection to Kafka and cleans any remaining state up. - /// - Parameter timeout: Maximum amount of milliseconds this method waits for any outstanding messages to be sent. - public func shutdownGracefully(timeout: Int32 = 10000) async { - switch self.state { - case .started: - self.state = .shuttingDown - await self._shutdownGracefully(timeout: timeout) - case .shuttingDown, .shutDown: - return - } - } - - private func _shutdownGracefully(timeout: Int32) async { - await withCheckedContinuation { (continuation: CheckedContinuation) in - // Wait `timeout` seconds for outstanding messages to be sent and callbacks to be called - self.client.withKafkaHandlePointer { handle in - rd_kafka_flush(handle, timeout) - continuation.resume() - } - } - - for (_, topicHandle) in self.topicHandles { - rd_kafka_topic_destroy(topicHandle) - } - - self.state = .shutDown + public func triggerGracefulShutdown() { // TODO(felix): make internal once we adapt swift-service-lifecycle + self.stateMachine.withLockedValue { $0.finish() } } /// Start polling Kafka for acknowledged messages. /// /// - Returns: An awaitable task representing the execution of the poll loop. public func run() async throws { - while self.state == .started { - self.client.poll(timeout: 0) - try await Task.sleep(for: self.config.pollInterval) + while !Task.isCancelled { + let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() } + switch nextAction { + case .poll(let client): + client.poll(timeout: 0) + try await Task.sleep(for: self.config.pollInterval) + case .terminatePollLoopAndFinishSource(let source): + source?.finish() + return + case .terminatePollLoop: + return + } } } @@ -228,68 +230,163 @@ public actor KafkaProducer { /// - Throws: A ``KafkaError`` if sending the message failed. @discardableResult public func send(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID { - switch self.state { - case .started: - return try self._send(message) - case .shuttingDown, .shutDown: - throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer") + let action = try self.stateMachine.withLockedValue { try $0.send() } + switch action { + case .send(let client, let newMessageID, let topicHandles): + try client.produce( + message: message, + newMessageID: newMessageID, + topicConfig: self.topicConfig, + topicHandles: topicHandles + ) + return KafkaProducerMessageID(rawValue: newMessageID) } } +} - private func _send(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID { - let topicHandle = try self.createTopicHandleIfNeeded(topic: message.topic) - - let keyBytes: [UInt8]? - if var key = message.key { - keyBytes = key.readBytes(length: key.readableBytes) - } else { - keyBytes = nil +// MARK: - KafkaProducer + StateMachine + +extension KafkaProducer { + /// State machine representing the state of the ``KafkaProducer``. + struct StateMachine { + /// A logger. + let logger: Logger + + /// The state of the ``StateMachine``. + enum State { + /// The state machine has been initialized with init() but is not yet Initialized + /// using `func initialize()` (required). + case uninitialized + /// The ``KafkaProducer`` has started and is ready to use. + /// + /// - Parameter messageIDCounter:Used to incrementally assign unique IDs to messages. + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + /// - Parameter topicHandles: Class containing all topic names with their respective `rd_kafka_topic_t` pointer. + case started( + client: KafkaClient, + messageIDCounter: UInt, + source: Producer.Source?, + topicHandles: RDKafkaTopicHandles + ) + /// ``KafkaProducer/triggerGracefulShutdown()`` was invoked so we are flushing + /// any messages that wait to be sent and serve any remaining queued callbacks. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + case flushing( + client: KafkaClient, + source: Producer.Source? + ) + /// The ``KafkaProducer`` has been shut down and cannot be used anymore. + case finished } - self.messageIDCounter += 1 - - let responseCode = message.value.withUnsafeReadableBytes { valueBuffer in - - // Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster. - // Returns 0 on success, error code otherwise. - return rd_kafka_produce( - topicHandle, - message.partition.rawValue, - RD_KAFKA_MSG_F_COPY, - UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress), - valueBuffer.count, - keyBytes, - keyBytes?.count ?? 0, - UnsafeMutableRawPointer(bitPattern: self.messageIDCounter) + /// The current state of the StateMachine. + var state: State = .uninitialized + + /// Delayed initialization of `StateMachine` as the `source` is not yet available + /// when the normal initialization occurs. + mutating func initialize( + client: KafkaClient, + source: Producer.Source? + ) { + guard case .uninitialized = self.state else { + fatalError("\(#function) can only be invoked in state .uninitialized, but was invoked in state \(self.state)") + } + self.state = .started( + client: client, + messageIDCounter: 0, + source: source, + topicHandles: RDKafkaTopicHandles(client: client) ) } - guard responseCode == 0 else { - throw KafkaError.rdKafkaError(wrapping: rd_kafka_last_error()) + /// Action to be taken when wanting to poll. + enum PollLoopAction { + /// Poll client for new consumer messages. + case poll(client: KafkaClient) + /// Terminate the poll loop and finish the given `NIOAsyncSequenceProducerSource`. + /// + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + case terminatePollLoopAndFinishSource(source: Producer.Source?) + /// Terminate the poll loop. + case terminatePollLoop } - return KafkaProducerMessageID(rawValue: self.messageIDCounter) - } + /// Returns the next action to be taken when wanting to poll. + /// - Returns: The next action to be taken, either polling or terminating the poll loop. + /// + /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. + mutating func nextPollLoopAction() -> PollLoopAction { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .started(let client, _, _, _): + return .poll(client: client) + case .flushing(let client, let source): + if client.outgoingQueueSize > 0 { + return .poll(client: client) + } else { + self.state = .finished + return .terminatePollLoopAndFinishSource(source: source) + } + case .finished: + return .terminatePollLoop + } + } - /// Check `topicHandles` for a handle matching the topic name and create a new handle if needed. - /// - Parameter topic: The name of the topic that is addressed. - private func createTopicHandleIfNeeded(topic: String) throws -> OpaquePointer? { - if let handle = self.topicHandles[topic] { - return handle - } else { - let newHandle = try self.client.withKafkaHandlePointer { handle in - let rdTopicConf = try RDKafkaTopicConfig.createFrom(topicConfig: self.topicConfig) - return rd_kafka_topic_new( - handle, - topic, - rdTopicConf + /// Action to be taken when wanting to send a message. + enum SendAction { + /// Send the message. + /// + /// - Important: `newMessageID` is the new message ID assigned to the message to be sent. + case send( + client: KafkaClient, + newMessageID: UInt, + topicHandles: RDKafkaTopicHandles + ) + } + + /// Get action to be taken when wanting to send a message. + /// + /// - Returns: The action to be taken. + mutating func send() throws -> SendAction { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .started(let client, let messageIDCounter, let source, let topicHandles): + let newMessageID = messageIDCounter + 1 + self.state = .started( + client: client, + messageIDCounter: newMessageID, + source: source, + topicHandles: topicHandles ) - // rd_kafka_topic_new deallocates topic config object + return .send( + client: client, + newMessageID: newMessageID, + topicHandles: topicHandles + ) + case .flushing: + throw KafkaError.connectionClosed(reason: "Producer in the process of flushing and shutting down") + case .finished: + throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer") } - if newHandle != nil { - self.topicHandles[topic] = newHandle + } + + /// Get action to be taken when wanting to do close the producer. + /// + /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. + mutating func finish() { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .started(let client, _, let source, _): + self.state = .flushing(client: client, source: source) + case .flushing, .finished: + break } - return newHandle } } } diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaTopicHandles.swift b/Sources/SwiftKafka/RDKafka/RDKafkaTopicHandles.swift new file mode 100644 index 00000000..8e213278 --- /dev/null +++ b/Sources/SwiftKafka/RDKafka/RDKafkaTopicHandles.swift @@ -0,0 +1,79 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-gsoc open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Crdkafka + +/// Swift class that matches topic names with their respective `rd_kafka_topic_t` handles. +internal class RDKafkaTopicHandles { + private var _internal: [String: OpaquePointer] + + // Note: we retain the client to ensure it does not get + // deinitialized before rd_kafka_topic_destroy() is invoked (required) + private let client: KafkaClient + + init(client: KafkaClient) { + self._internal = [:] + self.client = client + } + + deinit { + for (_, topicHandle) in self._internal { + rd_kafka_topic_destroy(topicHandle) + } + } + + /// Scoped accessor that enables safe access to the pointer of the topic's handle. + /// - Warning: Do not escape the pointer from the closure for later use. + /// - Parameter topic: The name of the topic that is addressed. + /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics. + /// - Parameter body: The closure will use the topic handle pointer. + @discardableResult + func withTopicHandlePointer( + topic: String, + topicConfig: KafkaTopicConfiguration, + _ body: (OpaquePointer) throws -> T + ) throws -> T { + let topicHandle = try self.createTopicHandleIfNeeded(topic: topic, topicConfig: topicConfig) + return try body(topicHandle) + } + + /// Check `topicHandles` for a handle matching the topic name and create a new handle if needed. + /// - Parameter topic: The name of the topic that is addressed. + private func createTopicHandleIfNeeded( + topic: String, + topicConfig: KafkaTopicConfiguration + ) throws -> OpaquePointer { + if let handle = self._internal[topic] { + return handle + } else { + let rdTopicConf = try RDKafkaTopicConfig.createFrom(topicConfig: topicConfig) + let newHandle = self.client.withKafkaHandlePointer { kafkaHandle in + rd_kafka_topic_new( + kafkaHandle, + topic, + rdTopicConf + ) + // rd_kafka_topic_new deallocates topic config object + } + + guard let newHandle else { + // newHandle is nil, so we can retrieve error through rd_kafka_last_error() + let error = KafkaError.rdKafkaError(wrapping: rd_kafka_last_error()) + throw error + } + self._internal[topic] = newHandle + return newHandle + } + } +} diff --git a/Sources/SwiftKafka/Utilities/NIOAsyncSequenceProducerBackPressureStrategies+NoBackPressure.swift b/Sources/SwiftKafka/Utilities/NIOAsyncSequenceProducerBackPressureStrategies+NoBackPressure.swift new file mode 100644 index 00000000..e53170f1 --- /dev/null +++ b/Sources/SwiftKafka/Utilities/NIOAsyncSequenceProducerBackPressureStrategies+NoBackPressure.swift @@ -0,0 +1,23 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-gsoc open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +extension NIOAsyncSequenceProducerBackPressureStrategies { + /// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true. + struct NoBackPressure: NIOAsyncSequenceProducerBackPressureStrategy { + func didYield(bufferDepth: Int) -> Bool { true } + func didConsume(bufferDepth: Int) -> Bool { true } + } +} diff --git a/Tests/IntegrationTests/SwiftKafkaTests.swift b/Tests/IntegrationTests/SwiftKafkaTests.swift index f3decc9f..c011b36e 100644 --- a/Tests/IntegrationTests/SwiftKafkaTests.swift +++ b/Tests/IntegrationTests/SwiftKafkaTests.swift @@ -74,7 +74,7 @@ final class SwiftKafkaTests: XCTestCase { func testProduceAndConsumeWithConsumerGroup() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) - let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) let consumerConfig = KafkaConsumerConfiguration( consumptionStrategy: .group(groupID: "subscription-test-group-id", topics: [self.uniqueTestTopic]), @@ -101,7 +101,7 @@ final class SwiftKafkaTests: XCTestCase { acknowledgements: acks, messages: testMessages ) - await producer.shutdownGracefully() + producer.triggerGracefulShutdown() } // Consumer Run Task @@ -136,7 +136,7 @@ final class SwiftKafkaTests: XCTestCase { func testProduceAndConsumeWithAssignedTopicPartition() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) - let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) let consumerConfig = KafkaConsumerConfiguration( consumptionStrategy: .partition( @@ -167,7 +167,7 @@ final class SwiftKafkaTests: XCTestCase { acknowledgements: acks, messages: testMessages ) - await producer.shutdownGracefully() + producer.triggerGracefulShutdown() } // Consumer Run Task @@ -202,7 +202,7 @@ final class SwiftKafkaTests: XCTestCase { func testProduceAndConsumeWithCommitSync() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) - let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) let consumerConfig = KafkaConsumerConfiguration( consumptionStrategy: .group(groupID: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]), @@ -230,7 +230,7 @@ final class SwiftKafkaTests: XCTestCase { acknowledgements: acks, messages: testMessages ) - await producer.shutdownGracefully() + producer.triggerGracefulShutdown() } // Consumer Run Task @@ -288,7 +288,7 @@ final class SwiftKafkaTests: XCTestCase { var messageIDs = Set() for message in messages { - messageIDs.insert(try await producer.send(message)) + messageIDs.insert(try producer.send(message)) } var acknowledgedMessages = Set() diff --git a/Tests/SwiftKafkaTests/KafkaProducerTests.swift b/Tests/SwiftKafkaTests/KafkaProducerTests.swift index 666da3c4..c97f89d8 100644 --- a/Tests/SwiftKafkaTests/KafkaProducerTests.swift +++ b/Tests/SwiftKafkaTests/KafkaProducerTests.swift @@ -52,7 +52,7 @@ final class KafkaProducerTests: XCTestCase { } func testSend() async throws { - let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) await withThrowingTaskGroup(of: Void.self) { group in @@ -70,7 +70,7 @@ final class KafkaProducerTests: XCTestCase { value: "Hello, World!" ) - let messageID = try await producer.send(message) + let messageID = try producer.send(message) for await messageResult in acks { guard case .success(let acknowledgedMessage) = messageResult else { @@ -85,13 +85,13 @@ final class KafkaProducerTests: XCTestCase { break } - await producer.shutdownGracefully() + producer.triggerGracefulShutdown() } } } func testSendEmptyMessage() async throws { - let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) await withThrowingTaskGroup(of: Void.self) { group in @@ -108,7 +108,7 @@ final class KafkaProducerTests: XCTestCase { value: ByteBuffer() ) - let messageID = try await producer.send(message) + let messageID = try producer.send(message) for await messageResult in acks { guard case .success(let acknowledgedMessage) = messageResult else { @@ -123,13 +123,13 @@ final class KafkaProducerTests: XCTestCase { break } - await producer.shutdownGracefully() + producer.triggerGracefulShutdown() } } } func testSendTwoTopics() async throws { - let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) await withThrowingTaskGroup(of: Void.self) { group in // Run Task @@ -152,8 +152,8 @@ final class KafkaProducerTests: XCTestCase { var messageIDs = Set() - messageIDs.insert(try await producer.send(message1)) - messageIDs.insert(try await producer.send(message2)) + messageIDs.insert(try producer.send(message1)) + messageIDs.insert(try producer.send(message2)) var acknowledgedMessages = Set() @@ -179,14 +179,51 @@ final class KafkaProducerTests: XCTestCase { XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message1.value })) XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message2.value })) - await producer.shutdownGracefully() + producer.triggerGracefulShutdown() + } + } + } + + func testFlushQueuedProducerMessages() async throws { + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) + + let message = KafkaProducerMessage( + topic: "test-topic", + key: "key", + value: "Hello, World!" + ) + let messageID = try producer.send(message) + + // We have not invoked `producer.run()` yet, which means that our message and its + // delivery report callback have been enqueued onto the `librdkafka` `outq`. + // By invoking `triggerGracefulShutdown()` now our `KafkaProducer` should enter the + // `flushing` state. + producer.triggerGracefulShutdown() + + await withThrowingTaskGroup(of: Void.self) { group in + // Now that we are in the `flushing` state, start the run loop. + group.addTask { + try await producer.run() + } + + // Since we are flushing, we should receive our messageAcknowledgement despite + // having invoked `triggerGracefulShutdown()` before. + group.addTask { + var iterator = acks.makeAsyncIterator() + let acknowledgement = await iterator.next()! + switch acknowledgement { + case .success(let acknowledgedMessage): + XCTAssertEqual(messageID, acknowledgedMessage.id) + case .failure(let error): + XCTFail("Unexpected acknowledgement error: \(error)") + } } } } func testProducerNotUsableAfterShutdown() async throws { - let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) - await producer.shutdownGracefully() + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) + producer.triggerGracefulShutdown() await withThrowingTaskGroup(of: Void.self) { group in @@ -203,12 +240,12 @@ final class KafkaProducerTests: XCTestCase { ) do { - try await producer.send(message) + try producer.send(message) XCTFail("Method should have thrown error") } catch {} // This subscribes to the acknowledgements stream and immediately terminates the stream. - // Required to kill the run task. + // Required to terminate the run task. var iterator: KafkaMessageAcknowledgements.AsyncIterator? = acks.makeAsyncIterator() _ = iterator iterator = nil @@ -219,12 +256,12 @@ final class KafkaProducerTests: XCTestCase { func testNoMemoryLeakAfterShutdown() async throws { var producer: KafkaProducer? var acks: KafkaMessageAcknowledgements? - (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) + (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) _ = acks weak var producerCopy = producer - await producer?.shutdownGracefully() + producer?.triggerGracefulShutdown() producer = nil // Make sure to terminate the AsyncSequence acks = nil