Skip to content

Commit

Permalink
Own implementation of rd_kafka_flush()
Browse files Browse the repository at this point in the history
Modifications:

* `KafkaClient`: add new property `outgoingQueueSize`
* `KafkaProducer.StateMachine`: add new state `.flushing`
* `KafkaProducer.shutdownGracefully()`:
    * make non-async
    * remove invocation to `rd_kafka_flush`
    * set state to `KafkaProducer.StateMachine.State` to `.flushing`
* `KafkaProducer` poll loop:
    * poll as long as `outgoingQueueSize` is > 0 to send out any
      enqueued `KafkaProducerMessage`s and serve any enqueued callbacks
* `KafkaProducerTests`: add test asserting that the `librdkafka` `outq`
  is still being served after `KafkaProducer.shutdownGracefully` has
  been invoked as long as there are enqueued items
  • Loading branch information
felixschlegel committed Jun 30, 2023
1 parent 387fc03 commit dd9ea2f
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 77 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
}

// Required
await producer.shutdownGracefully()
producer.shutdownGracefully()
}
}
```
Expand Down
9 changes: 9 additions & 0 deletions Sources/SwiftKafka/KafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,19 @@ final class KafkaClient {
}
}

/// Returns `true` if the underlying `librdkafka` consumer is closed.
var isConsumerClosed: Bool {
rd_kafka_consumer_closed(self.kafkaHandle) == 1
}

/// Returns the current out queue length.
///
/// This means the number of producer messages that wait to be sent + the number of any
/// callbacks that are waiting to be executed by invoking `rd_kafka_poll`.
var outgoingQueueSize: Int32 {
return rd_kafka_outq_len(self.kafkaHandle)
}

/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
/// - Warning: Do not escape the pointer from the closure for later use.
/// - Parameter body: The closure will use the Kafka handle pointer.
Expand Down
101 changes: 33 additions & 68 deletions Sources/SwiftKafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,7 @@ extension KafkaProducerShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
}

func didTerminate() {
let action = self.stateMachine.withLockedValue { $0.finish() }
switch action {
case .shutdownGracefullyAndFinishSource(let client, let source):
Task {
await KafkaProducer._shutDownGracefully(
client: client,
source: source,
timeout: 10000
)
}
case .none:
return
}
self.stateMachine.withLockedValue { $0.finish() }
}
}

Expand Down Expand Up @@ -72,7 +60,7 @@ public struct KafkaMessageAcknowledgements: AsyncSequence {
}

/// Send messages to the Kafka cluster.
/// Please make sure to explicitly call ``shutdownGracefully(timeout:)`` when the ``KafkaProducer`` is not used anymore.
/// Please make sure to explicitly call ``shutdownGracefully()`` when the ``KafkaProducer`` is not used anymore.
/// - Note: When messages get published to a non-existent topic, a new topic is created using the ``KafkaTopicConfiguration``
/// configuration object (only works if server has `auto.create.topics.enable` property set).
public final class KafkaProducer {
Expand Down Expand Up @@ -210,49 +198,23 @@ public final class KafkaProducer {
///
/// This method flushes any buffered messages and waits until a callback is received for all of them.
/// Afterwards, it shuts down the connection to Kafka and cleans any remaining state up.
/// - Parameter timeout: Maximum amount of milliseconds this method waits for any outstanding messages to be sent.
public func shutdownGracefully(timeout: Int32 = 10000) async {
let action = self.stateMachine.withLockedValue { $0.finish() }
switch action {
case .shutdownGracefullyAndFinishSource(let client, let source):
await KafkaProducer._shutDownGracefully(
client: client,
source: source,
timeout: timeout
)
case .none:
return
}
}

// Static so we perform this without needing a reference to `KafkaProducer`
static func _shutDownGracefully(
client: KafkaClient,
source: Producer.Source?,
timeout: Int32
) async {
source?.finish()

await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
// Wait `timeout` seconds for outstanding messages to be sent and callbacks to be called
client.withKafkaHandlePointer { handle in
rd_kafka_flush(handle, timeout)
continuation.resume()
}
}
public func shutdownGracefully() {
self.stateMachine.withLockedValue { $0.finish() }
}

/// Start polling Kafka for acknowledged messages.
///
/// - Returns: An awaitable task representing the execution of the poll loop.
public func run() async throws {
// TODO(felix): make pollInterval part of config -> easier to adapt to Service protocol (service-lifecycle)
while !Task.isCancelled {
let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() }
switch nextAction {
case .poll(let client):
client.poll(timeout: 0)
try await Task.sleep(for: self.config.pollInterval)
case .terminatePollLoopAndFinishSource(let source):
source?.finish()
return
case .terminatePollLoop:
return
}
Expand Down Expand Up @@ -307,6 +269,15 @@ extension KafkaProducer {
source: Producer.Source?,
topicHandles: RDKafkaTopicHandles
)
/// ``KafkaProducer/shutdownGracefully()`` was invoked so we are flushing
/// any messages that wait to be sent and serve any remaining queued callbacks.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
case flushing(
client: KafkaClient,
source: Producer.Source?
)
/// The ``KafkaProducer`` has been shut down and cannot be used anymore.
case finished
}
Expand Down Expand Up @@ -335,6 +306,10 @@ extension KafkaProducer {
enum PollLoopAction {
/// Poll client for new consumer messages.
case poll(client: KafkaClient)
/// Terminate the poll loop and finish the given `NIOAsyncSequenceProducerSource`.
///
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
case terminatePollLoopAndFinishSource(source: Producer.Source?)
/// Terminate the poll loop.
case terminatePollLoop
}
Expand All @@ -343,12 +318,19 @@ extension KafkaProducer {
/// - Returns: The next action to be taken, either polling or terminating the poll loop.
///
/// - Important: This function throws a `fatalError` if called while in the `.initializing` state.
func nextPollLoopAction() -> PollLoopAction {
mutating func nextPollLoopAction() -> PollLoopAction {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .started(let client, _, _, _):
return .poll(client: client)
case .flushing(let client, let source):
if client.outgoingQueueSize > 0 {
return .poll(client: client)
} else {
self.state = .finished
return .terminatePollLoopAndFinishSource(source: source)
}
case .finished:
return .terminatePollLoop
}
Expand Down Expand Up @@ -386,39 +368,22 @@ extension KafkaProducer {
newMessageID: newMessageID,
topicHandles: topicHandles
)
case .finished:
case .flushing, .finished:
throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer")
}
}

/// Action to be taken when wanting to do close the producer.
enum FinishAction {
/// Shut down the ``KafkaProducer`` and finish the given `source` object.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
case shutdownGracefullyAndFinishSource(
client: KafkaClient,
source: Producer.Source?
)
}

/// Get action to be taken when wanting to do close the producer.
/// - Returns: The action to be taken, or `nil` if there is no action to be taken.
///
/// - Important: This function throws a `fatalError` if called while in the `.initializing` state.
mutating func finish() -> FinishAction? {
mutating func finish() {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .started(let client, _, let source, _):
self.state = .finished
return .shutdownGracefullyAndFinishSource(
client: client,
source: source
)
case .finished:
return nil
self.state = .flushing(client: client, source: source)
case .flushing, .finished:
break
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions Tests/IntegrationTests/SwiftKafkaTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ final class SwiftKafkaTests: XCTestCase {
acknowledgements: acks,
messages: testMessages
)
await producer.shutdownGracefully()
producer.shutdownGracefully()
}

// Consumer Run Task
Expand Down Expand Up @@ -167,7 +167,7 @@ final class SwiftKafkaTests: XCTestCase {
acknowledgements: acks,
messages: testMessages
)
await producer.shutdownGracefully()
producer.shutdownGracefully()
}

// Consumer Run Task
Expand Down Expand Up @@ -230,7 +230,7 @@ final class SwiftKafkaTests: XCTestCase {
acknowledgements: acks,
messages: testMessages
)
await producer.shutdownGracefully()
producer.shutdownGracefully()
}

// Consumer Run Task
Expand Down
47 changes: 42 additions & 5 deletions Tests/SwiftKafkaTests/KafkaProducerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ final class KafkaProducerTests: XCTestCase {
break
}

await producer.shutdownGracefully()
producer.shutdownGracefully()
}
}
}
Expand Down Expand Up @@ -123,7 +123,7 @@ final class KafkaProducerTests: XCTestCase {
break
}

await producer.shutdownGracefully()
producer.shutdownGracefully()
}
}
}
Expand Down Expand Up @@ -179,14 +179,51 @@ final class KafkaProducerTests: XCTestCase {
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message1.value }))
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message2.value }))

await producer.shutdownGracefully()
producer.shutdownGracefully()
}
}
}

func testFlushQueuedProducerMessages() async throws {
let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest)

let message = KafkaProducerMessage(
topic: "test-topic",
key: "key",
value: "Hello, World!"
)
let messageID = try producer.send(message)

// We have not invoked `producer.run()` yet, which means that our message and its
// delivery report callback have been enqueued onto the `librdkafka` `outq`.
// By invoking `shutdownGracefully()` now our `KafkaProducer` should enter the
// `flushing` state.
producer.shutdownGracefully()

await withThrowingTaskGroup(of: Void.self) { group in
// Now that we are in the `flushing` state, start the run loop.
group.addTask {
try await producer.run()
}

// Since we are flushing, we should receive our messageAcknowledgement despite
// having invoked `shutdownGracefully()` before.
group.addTask {
var iterator = acks.makeAsyncIterator()
let acknowledgement = await iterator.next()!
switch acknowledgement {
case .success(let acknowledgedMessage):
XCTAssertEqual(messageID, acknowledgedMessage.id)
case .failure(let error):
XCTFail("Unexpected acknowledgement error: \(error)")
}
}
}
}

func testProducerNotUsableAfterShutdown() async throws {
let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest)
await producer.shutdownGracefully()
producer.shutdownGracefully()

await withThrowingTaskGroup(of: Void.self) { group in

Expand Down Expand Up @@ -224,7 +261,7 @@ final class KafkaProducerTests: XCTestCase {

weak var producerCopy = producer

await producer?.shutdownGracefully()
producer?.shutdownGracefully()
producer = nil
// Make sure to terminate the AsyncSequence
acks = nil
Expand Down

0 comments on commit dd9ea2f

Please sign in to comment.