diff --git a/README.md b/README.md index 94d35020..0c8accf9 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ await withThrowingTaskGroup(of: Void.self) { group in } // Required - await producer.shutdownGracefully() + producer.shutdownGracefully() } } ``` diff --git a/Sources/SwiftKafka/KafkaClient.swift b/Sources/SwiftKafka/KafkaClient.swift index bee706bf..ca5f26be 100644 --- a/Sources/SwiftKafka/KafkaClient.swift +++ b/Sources/SwiftKafka/KafkaClient.swift @@ -251,10 +251,19 @@ final class KafkaClient { } } + /// Returns `true` if the underlying `librdkafka` consumer is closed. var isConsumerClosed: Bool { rd_kafka_consumer_closed(self.kafkaHandle) == 1 } + /// Returns the current out queue length. + /// + /// This means the number of producer messages that wait to be sent + the number of any + /// callbacks that are waiting to be executed by invoking `rd_kafka_poll`. + var outgoingQueueSize: Int32 { + return rd_kafka_outq_len(self.kafkaHandle) + } + /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle. /// - Warning: Do not escape the pointer from the closure for later use. /// - Parameter body: The closure will use the Kafka handle pointer. diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift index c534e13a..400ebacf 100644 --- a/Sources/SwiftKafka/KafkaProducer.swift +++ b/Sources/SwiftKafka/KafkaProducer.swift @@ -32,19 +32,7 @@ extension KafkaProducerShutdownOnTerminate: NIOAsyncSequenceProducerDelegate { } func didTerminate() { - let action = self.stateMachine.withLockedValue { $0.finish() } - switch action { - case .shutdownGracefullyAndFinishSource(let client, let source): - Task { - await KafkaProducer._shutDownGracefully( - client: client, - source: source, - timeout: 10000 - ) - } - case .none: - return - } + self.stateMachine.withLockedValue { $0.finish() } } } @@ -72,7 +60,7 @@ public struct KafkaMessageAcknowledgements: AsyncSequence { } /// Send messages to the Kafka cluster. -/// Please make sure to explicitly call ``shutdownGracefully(timeout:)`` when the ``KafkaProducer`` is not used anymore. +/// Please make sure to explicitly call ``shutdownGracefully()`` when the ``KafkaProducer`` is not used anymore. /// - Note: When messages get published to a non-existent topic, a new topic is created using the ``KafkaTopicConfiguration`` /// configuration object (only works if server has `auto.create.topics.enable` property set). public final class KafkaProducer { @@ -210,49 +198,23 @@ public final class KafkaProducer { /// /// This method flushes any buffered messages and waits until a callback is received for all of them. /// Afterwards, it shuts down the connection to Kafka and cleans any remaining state up. - /// - Parameter timeout: Maximum amount of milliseconds this method waits for any outstanding messages to be sent. - public func shutdownGracefully(timeout: Int32 = 10000) async { - let action = self.stateMachine.withLockedValue { $0.finish() } - switch action { - case .shutdownGracefullyAndFinishSource(let client, let source): - await KafkaProducer._shutDownGracefully( - client: client, - source: source, - timeout: timeout - ) - case .none: - return - } - } - - // Static so we perform this without needing a reference to `KafkaProducer` - static func _shutDownGracefully( - client: KafkaClient, - source: Producer.Source?, - timeout: Int32 - ) async { - source?.finish() - - await withCheckedContinuation { (continuation: CheckedContinuation) in - // Wait `timeout` seconds for outstanding messages to be sent and callbacks to be called - client.withKafkaHandlePointer { handle in - rd_kafka_flush(handle, timeout) - continuation.resume() - } - } + public func shutdownGracefully() { + self.stateMachine.withLockedValue { $0.finish() } } /// Start polling Kafka for acknowledged messages. /// /// - Returns: An awaitable task representing the execution of the poll loop. public func run() async throws { - // TODO(felix): make pollInterval part of config -> easier to adapt to Service protocol (service-lifecycle) while !Task.isCancelled { let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() } switch nextAction { case .poll(let client): client.poll(timeout: 0) try await Task.sleep(for: self.config.pollInterval) + case .terminatePollLoopAndFinishSource(let source): + source?.finish() + return case .terminatePollLoop: return } @@ -307,6 +269,15 @@ extension KafkaProducer { source: Producer.Source?, topicHandles: RDKafkaTopicHandles ) + /// ``KafkaProducer/shutdownGracefully()`` was invoked so we are flushing + /// any messages that wait to be sent and serve any remaining queued callbacks. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + case flushing( + client: KafkaClient, + source: Producer.Source? + ) /// The ``KafkaProducer`` has been shut down and cannot be used anymore. case finished } @@ -335,6 +306,10 @@ extension KafkaProducer { enum PollLoopAction { /// Poll client for new consumer messages. case poll(client: KafkaClient) + /// Terminate the poll loop and finish the given `NIOAsyncSequenceProducerSource`. + /// + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + case terminatePollLoopAndFinishSource(source: Producer.Source?) /// Terminate the poll loop. case terminatePollLoop } @@ -343,12 +318,19 @@ extension KafkaProducer { /// - Returns: The next action to be taken, either polling or terminating the poll loop. /// /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. - func nextPollLoopAction() -> PollLoopAction { + mutating func nextPollLoopAction() -> PollLoopAction { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .started(let client, _, _, _): return .poll(client: client) + case .flushing(let client, let source): + if client.outgoingQueueSize > 0 { + return .poll(client: client) + } else { + self.state = .finished + return .terminatePollLoopAndFinishSource(source: source) + } case .finished: return .terminatePollLoop } @@ -386,39 +368,22 @@ extension KafkaProducer { newMessageID: newMessageID, topicHandles: topicHandles ) - case .finished: + case .flushing, .finished: throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer") } } - /// Action to be taken when wanting to do close the producer. - enum FinishAction { - /// Shut down the ``KafkaProducer`` and finish the given `source` object. - /// - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case shutdownGracefullyAndFinishSource( - client: KafkaClient, - source: Producer.Source? - ) - } - /// Get action to be taken when wanting to do close the producer. - /// - Returns: The action to be taken, or `nil` if there is no action to be taken. /// /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. - mutating func finish() -> FinishAction? { + mutating func finish() { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .started(let client, _, let source, _): - self.state = .finished - return .shutdownGracefullyAndFinishSource( - client: client, - source: source - ) - case .finished: - return nil + self.state = .flushing(client: client, source: source) + case .flushing, .finished: + break } } } diff --git a/Tests/IntegrationTests/SwiftKafkaTests.swift b/Tests/IntegrationTests/SwiftKafkaTests.swift index 4c439680..2d57660a 100644 --- a/Tests/IntegrationTests/SwiftKafkaTests.swift +++ b/Tests/IntegrationTests/SwiftKafkaTests.swift @@ -101,7 +101,7 @@ final class SwiftKafkaTests: XCTestCase { acknowledgements: acks, messages: testMessages ) - await producer.shutdownGracefully() + producer.shutdownGracefully() } // Consumer Run Task @@ -167,7 +167,7 @@ final class SwiftKafkaTests: XCTestCase { acknowledgements: acks, messages: testMessages ) - await producer.shutdownGracefully() + producer.shutdownGracefully() } // Consumer Run Task @@ -230,7 +230,7 @@ final class SwiftKafkaTests: XCTestCase { acknowledgements: acks, messages: testMessages ) - await producer.shutdownGracefully() + producer.shutdownGracefully() } // Consumer Run Task diff --git a/Tests/SwiftKafkaTests/KafkaProducerTests.swift b/Tests/SwiftKafkaTests/KafkaProducerTests.swift index 1575bd00..3a1bf99c 100644 --- a/Tests/SwiftKafkaTests/KafkaProducerTests.swift +++ b/Tests/SwiftKafkaTests/KafkaProducerTests.swift @@ -85,7 +85,7 @@ final class KafkaProducerTests: XCTestCase { break } - await producer.shutdownGracefully() + producer.shutdownGracefully() } } } @@ -123,7 +123,7 @@ final class KafkaProducerTests: XCTestCase { break } - await producer.shutdownGracefully() + producer.shutdownGracefully() } } } @@ -179,14 +179,51 @@ final class KafkaProducerTests: XCTestCase { XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message1.value })) XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message2.value })) - await producer.shutdownGracefully() + producer.shutdownGracefully() + } + } + } + + func testFlushQueuedProducerMessages() async throws { + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) + + let message = KafkaProducerMessage( + topic: "test-topic", + key: "key", + value: "Hello, World!" + ) + let messageID = try producer.send(message) + + // We have not invoked `producer.run()` yet, which means that our message and its + // delivery report callback have been enqueued onto the `librdkafka` `outq`. + // By invoking `shutdownGracefully()` now our `KafkaProducer` should enter the + // `flushing` state. + producer.shutdownGracefully() + + await withThrowingTaskGroup(of: Void.self) { group in + // Now that we are in the `flushing` state, start the run loop. + group.addTask { + try await producer.run() + } + + // Since we are flushing, we should receive our messageAcknowledgement despite + // having invoked `shutdownGracefully()` before. + group.addTask { + var iterator = acks.makeAsyncIterator() + let acknowledgement = await iterator.next()! + switch acknowledgement { + case .success(let acknowledgedMessage): + XCTAssertEqual(messageID, acknowledgedMessage.id) + case .failure(let error): + XCTFail("Unexpected acknowledgement error: \(error)") + } } } } func testProducerNotUsableAfterShutdown() async throws { let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) - await producer.shutdownGracefully() + producer.shutdownGracefully() await withThrowingTaskGroup(of: Void.self) { group in @@ -224,7 +261,7 @@ final class KafkaProducerTests: XCTestCase { weak var producerCopy = producer - await producer?.shutdownGracefully() + producer?.shutdownGracefully() producer = nil // Make sure to terminate the AsyncSequence acks = nil