From c9b4d03f8e17f427bc8237fa3173e32e687a0836 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Fri, 30 Jun 2023 11:55:12 +0100 Subject: [PATCH] Review Franz Modifications: * rename `KafkaProducer.shutdownGracefully` to `KafkaProducer.triggerGracefulShutdown` * `KafkaProducer.send` separate error message when in state `.flushing` --- README.md | 2 +- Sources/SwiftKafka/KafkaProducer.swift | 10 ++++++---- Tests/IntegrationTests/SwiftKafkaTests.swift | 6 +++--- Tests/SwiftKafkaTests/KafkaProducerTests.swift | 16 ++++++++-------- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 0c8accf9..3ba669d7 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ await withThrowingTaskGroup(of: Void.self) { group in } // Required - producer.shutdownGracefully() + producer.triggerGracefulShutdown() } } ``` diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift index 400ebacf..e37a098d 100644 --- a/Sources/SwiftKafka/KafkaProducer.swift +++ b/Sources/SwiftKafka/KafkaProducer.swift @@ -60,7 +60,7 @@ public struct KafkaMessageAcknowledgements: AsyncSequence { } /// Send messages to the Kafka cluster. -/// Please make sure to explicitly call ``shutdownGracefully()`` when the ``KafkaProducer`` is not used anymore. +/// Please make sure to explicitly call ``triggerGracefulShutdown()`` when the ``KafkaProducer`` is not used anymore. /// - Note: When messages get published to a non-existent topic, a new topic is created using the ``KafkaTopicConfiguration`` /// configuration object (only works if server has `auto.create.topics.enable` property set). public final class KafkaProducer { @@ -198,7 +198,7 @@ public final class KafkaProducer { /// /// This method flushes any buffered messages and waits until a callback is received for all of them. /// Afterwards, it shuts down the connection to Kafka and cleans any remaining state up. - public func shutdownGracefully() { + public func triggerGracefulShutdown() { // TODO(felix): make internal once we adapt swift-service-lifecycle self.stateMachine.withLockedValue { $0.finish() } } @@ -269,7 +269,7 @@ extension KafkaProducer { source: Producer.Source?, topicHandles: RDKafkaTopicHandles ) - /// ``KafkaProducer/shutdownGracefully()`` was invoked so we are flushing + /// ``KafkaProducer/triggerGracefulShutdown()`` was invoked so we are flushing /// any messages that wait to be sent and serve any remaining queued callbacks. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. @@ -368,7 +368,9 @@ extension KafkaProducer { newMessageID: newMessageID, topicHandles: topicHandles ) - case .flushing, .finished: + case .flushing: + throw KafkaError.connectionClosed(reason: "Producer in the process of flushing and shutting down") + case .finished: throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer") } } diff --git a/Tests/IntegrationTests/SwiftKafkaTests.swift b/Tests/IntegrationTests/SwiftKafkaTests.swift index 2d57660a..c011b36e 100644 --- a/Tests/IntegrationTests/SwiftKafkaTests.swift +++ b/Tests/IntegrationTests/SwiftKafkaTests.swift @@ -101,7 +101,7 @@ final class SwiftKafkaTests: XCTestCase { acknowledgements: acks, messages: testMessages ) - producer.shutdownGracefully() + producer.triggerGracefulShutdown() } // Consumer Run Task @@ -167,7 +167,7 @@ final class SwiftKafkaTests: XCTestCase { acknowledgements: acks, messages: testMessages ) - producer.shutdownGracefully() + producer.triggerGracefulShutdown() } // Consumer Run Task @@ -230,7 +230,7 @@ final class SwiftKafkaTests: XCTestCase { acknowledgements: acks, messages: testMessages ) - producer.shutdownGracefully() + producer.triggerGracefulShutdown() } // Consumer Run Task diff --git a/Tests/SwiftKafkaTests/KafkaProducerTests.swift b/Tests/SwiftKafkaTests/KafkaProducerTests.swift index 3a1bf99c..c97f89d8 100644 --- a/Tests/SwiftKafkaTests/KafkaProducerTests.swift +++ b/Tests/SwiftKafkaTests/KafkaProducerTests.swift @@ -85,7 +85,7 @@ final class KafkaProducerTests: XCTestCase { break } - producer.shutdownGracefully() + producer.triggerGracefulShutdown() } } } @@ -123,7 +123,7 @@ final class KafkaProducerTests: XCTestCase { break } - producer.shutdownGracefully() + producer.triggerGracefulShutdown() } } } @@ -179,7 +179,7 @@ final class KafkaProducerTests: XCTestCase { XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message1.value })) XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message2.value })) - producer.shutdownGracefully() + producer.triggerGracefulShutdown() } } } @@ -196,9 +196,9 @@ final class KafkaProducerTests: XCTestCase { // We have not invoked `producer.run()` yet, which means that our message and its // delivery report callback have been enqueued onto the `librdkafka` `outq`. - // By invoking `shutdownGracefully()` now our `KafkaProducer` should enter the + // By invoking `triggerGracefulShutdown()` now our `KafkaProducer` should enter the // `flushing` state. - producer.shutdownGracefully() + producer.triggerGracefulShutdown() await withThrowingTaskGroup(of: Void.self) { group in // Now that we are in the `flushing` state, start the run loop. @@ -207,7 +207,7 @@ final class KafkaProducerTests: XCTestCase { } // Since we are flushing, we should receive our messageAcknowledgement despite - // having invoked `shutdownGracefully()` before. + // having invoked `triggerGracefulShutdown()` before. group.addTask { var iterator = acks.makeAsyncIterator() let acknowledgement = await iterator.next()! @@ -223,7 +223,7 @@ final class KafkaProducerTests: XCTestCase { func testProducerNotUsableAfterShutdown() async throws { let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) - producer.shutdownGracefully() + producer.triggerGracefulShutdown() await withThrowingTaskGroup(of: Void.self) { group in @@ -261,7 +261,7 @@ final class KafkaProducerTests: XCTestCase { weak var producerCopy = producer - producer?.shutdownGracefully() + producer?.triggerGracefulShutdown() producer = nil // Make sure to terminate the AsyncSequence acks = nil