Skip to content

Commit

Permalink
Review Franz
Browse files Browse the repository at this point in the history
Modifications:

* rename `KafkaProducer.shutdownGracefully` to
  `KafkaProducer.triggerGracefulShutdown`
* `KafkaProducer.send` separate error message when in state `.flushing`
  • Loading branch information
felixschlegel committed Jun 30, 2023
1 parent dd9ea2f commit c9b4d03
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 16 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
}

// Required
producer.shutdownGracefully()
producer.triggerGracefulShutdown()
}
}
```
Expand Down
10 changes: 6 additions & 4 deletions Sources/SwiftKafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public struct KafkaMessageAcknowledgements: AsyncSequence {
}

/// Send messages to the Kafka cluster.
/// Please make sure to explicitly call ``shutdownGracefully()`` when the ``KafkaProducer`` is not used anymore.
/// Please make sure to explicitly call ``triggerGracefulShutdown()`` when the ``KafkaProducer`` is not used anymore.
/// - Note: When messages get published to a non-existent topic, a new topic is created using the ``KafkaTopicConfiguration``
/// configuration object (only works if server has `auto.create.topics.enable` property set).
public final class KafkaProducer {
Expand Down Expand Up @@ -198,7 +198,7 @@ public final class KafkaProducer {
///
/// This method flushes any buffered messages and waits until a callback is received for all of them.
/// Afterwards, it shuts down the connection to Kafka and cleans any remaining state up.
public func shutdownGracefully() {
public func triggerGracefulShutdown() { // TODO(felix): make internal once we adapt swift-service-lifecycle
self.stateMachine.withLockedValue { $0.finish() }
}

Expand Down Expand Up @@ -269,7 +269,7 @@ extension KafkaProducer {
source: Producer.Source?,
topicHandles: RDKafkaTopicHandles
)
/// ``KafkaProducer/shutdownGracefully()`` was invoked so we are flushing
/// ``KafkaProducer/triggerGracefulShutdown()`` was invoked so we are flushing
/// any messages that wait to be sent and serve any remaining queued callbacks.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
Expand Down Expand Up @@ -368,7 +368,9 @@ extension KafkaProducer {
newMessageID: newMessageID,
topicHandles: topicHandles
)
case .flushing, .finished:
case .flushing:
throw KafkaError.connectionClosed(reason: "Producer in the process of flushing and shutting down")
case .finished:
throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer")
}
}
Expand Down
6 changes: 3 additions & 3 deletions Tests/IntegrationTests/SwiftKafkaTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ final class SwiftKafkaTests: XCTestCase {
acknowledgements: acks,
messages: testMessages
)
producer.shutdownGracefully()
producer.triggerGracefulShutdown()
}

// Consumer Run Task
Expand Down Expand Up @@ -167,7 +167,7 @@ final class SwiftKafkaTests: XCTestCase {
acknowledgements: acks,
messages: testMessages
)
producer.shutdownGracefully()
producer.triggerGracefulShutdown()
}

// Consumer Run Task
Expand Down Expand Up @@ -230,7 +230,7 @@ final class SwiftKafkaTests: XCTestCase {
acknowledgements: acks,
messages: testMessages
)
producer.shutdownGracefully()
producer.triggerGracefulShutdown()
}

// Consumer Run Task
Expand Down
16 changes: 8 additions & 8 deletions Tests/SwiftKafkaTests/KafkaProducerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ final class KafkaProducerTests: XCTestCase {
break
}

producer.shutdownGracefully()
producer.triggerGracefulShutdown()
}
}
}
Expand Down Expand Up @@ -123,7 +123,7 @@ final class KafkaProducerTests: XCTestCase {
break
}

producer.shutdownGracefully()
producer.triggerGracefulShutdown()
}
}
}
Expand Down Expand Up @@ -179,7 +179,7 @@ final class KafkaProducerTests: XCTestCase {
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message1.value }))
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message2.value }))

producer.shutdownGracefully()
producer.triggerGracefulShutdown()
}
}
}
Expand All @@ -196,9 +196,9 @@ final class KafkaProducerTests: XCTestCase {

// We have not invoked `producer.run()` yet, which means that our message and its
// delivery report callback have been enqueued onto the `librdkafka` `outq`.
// By invoking `shutdownGracefully()` now our `KafkaProducer` should enter the
// By invoking `triggerGracefulShutdown()` now our `KafkaProducer` should enter the
// `flushing` state.
producer.shutdownGracefully()
producer.triggerGracefulShutdown()

await withThrowingTaskGroup(of: Void.self) { group in
// Now that we are in the `flushing` state, start the run loop.
Expand All @@ -207,7 +207,7 @@ final class KafkaProducerTests: XCTestCase {
}

// Since we are flushing, we should receive our messageAcknowledgement despite
// having invoked `shutdownGracefully()` before.
// having invoked `triggerGracefulShutdown()` before.
group.addTask {
var iterator = acks.makeAsyncIterator()
let acknowledgement = await iterator.next()!
Expand All @@ -223,7 +223,7 @@ final class KafkaProducerTests: XCTestCase {

func testProducerNotUsableAfterShutdown() async throws {
let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest)
producer.shutdownGracefully()
producer.triggerGracefulShutdown()

await withThrowingTaskGroup(of: Void.self) { group in

Expand Down Expand Up @@ -261,7 +261,7 @@ final class KafkaProducerTests: XCTestCase {

weak var producerCopy = producer

producer?.shutdownGracefully()
producer?.triggerGracefulShutdown()
producer = nil
// Make sure to terminate the AsyncSequence
acks = nil
Expand Down

0 comments on commit c9b4d03

Please sign in to comment.