Skip to content

Commit

Permalink
Refactor KafkaProducer (#67)
Browse files Browse the repository at this point in the history
* Refactor `KafkaProducer`

Motiviation:

* align `KafkaProducer` more with proposed changes to `KafkaConsumer`
* `AsyncStream` was not handling `AsyncSequence` termination handling as
  we wanted it to, so revert back to use `NIOAsyncSequenceProducer`

Modifications:

* make `KafkaProducer` `final class` instead of `actor`
* `KafkaProducer`: use `NIOAsyncSequenceProducer` instead of
  `AsyncSequence` for better termination handling -> shutdown
  `KafkaProducer` on termination of the `AsyncSequence`
* introduce `StateMachine` to `KafkaProducer`
* move internal state of `KafkaProducer` to `KafkaProducer.StateMachine`
* remove unused `await` expressions when accessing `KafkaProducer`
* update tests
* update `README`

* * rename KafkaProducer.StateMachine.State.shutDown to .finished

* Remove unused awaits

* KafkaProducer: move logger out of state

* KafkaProducer: rename `killPollLoop` -> `terminatePollLoop`

* Fix errors after rebase

Modifications:

* move `NoBackPressure` struct to `extension` of
  `NIOAsyncSequenceProducerBackPressureStrategies`
* break down duplicate `ShutDownOnTerminate` type into two more
  specialised types for `KafkaConsumer` and `KafkaProducer`
* add missing `config` parameter to `KafkaProducer`'s initialiser

* Create wrapper for Kafka topic handle dict

Modifications:

* create new class `RDKafkaTopicHandles` that wraps a dictionary
  containing all topic names with their respective `rd_kafka_topic_t` handles
* create method `KafkaClient.produce` wrapping the `rd_kafka_produce`
  method in a Swift way

* Own implementation of `rd_kafka_flush()`

Modifications:

* `KafkaClient`: add new property `outgoingQueueSize`
* `KafkaProducer.StateMachine`: add new state `.flushing`
* `KafkaProducer.shutdownGracefully()`:
    * make non-async
    * remove invocation to `rd_kafka_flush`
    * set state to `KafkaProducer.StateMachine.State` to `.flushing`
* `KafkaProducer` poll loop:
    * poll as long as `outgoingQueueSize` is > 0 to send out any
      enqueued `KafkaProducerMessage`s and serve any enqueued callbacks
* `KafkaProducerTests`: add test asserting that the `librdkafka` `outq`
  is still being served after `KafkaProducer.shutdownGracefully` has
  been invoked as long as there are enqueued items

* Review Franz

Modifications:

* rename `KafkaProducer.shutdownGracefully` to
  `KafkaProducer.triggerGracefulShutdown`
* `KafkaProducer.send` separate error message when in state `.flushing`
  • Loading branch information
felixschlegel authored Jul 3, 2023
1 parent 588af60 commit 5ed295f
Show file tree
Hide file tree
Showing 8 changed files with 455 additions and 176 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ The `send(_:)` method of `KafkaProducer` returns a message-id that can later be
```swift
let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"])

let (producer, acknowledgements) = try await KafkaProducer.makeProducerWithAcknowledgements(
let (producer, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements(
config: config,
logger: .kafkaTest // Your logger here
)
Expand All @@ -44,7 +44,7 @@ await withThrowingTaskGroup(of: Void.self) { group in

// Task receiving acknowledgements
group.addTask {
let messageID = try await producer.send(
let messageID = try producer.send(
KafkaProducerMessage(
topic: "topic-name",
value: "Hello, World!"
Expand All @@ -56,7 +56,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
}

// Required
await producer.shutdownGracefully()
producer.triggerGracefulShutdown()
}
}
```
Expand Down
50 changes: 50 additions & 0 deletions Sources/SwiftKafka/KafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,47 @@ final class KafkaClient {
rd_kafka_destroy(kafkaHandle)
}

/// Produce a message to the Kafka cluster.
///
/// - Parameter message: The ``KafkaProducerMessage`` that is sent to the KafkaCluster.
/// - Parameter newMessageID: ID that was assigned to the `message`.
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
/// - Parameter topicHandles: Topic handles that this client uses to produce new messages
func produce(
message: KafkaProducerMessage,
newMessageID: UInt,
topicConfig: KafkaTopicConfiguration,
topicHandles: RDKafkaTopicHandles
) throws {
let keyBytes: [UInt8]?
if var key = message.key {
keyBytes = key.readBytes(length: key.readableBytes)
} else {
keyBytes = nil
}

let responseCode = try message.value.withUnsafeReadableBytes { valueBuffer in
return try topicHandles.withTopicHandlePointer(topic: message.topic, topicConfig: topicConfig) { topicHandle in
// Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster.
// Returns 0 on success, error code otherwise.
return rd_kafka_produce(
topicHandle,
message.partition.rawValue,
RD_KAFKA_MSG_F_COPY,
UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress),
valueBuffer.count,
keyBytes,
keyBytes?.count ?? 0,
UnsafeMutableRawPointer(bitPattern: newMessageID)
)
}
}

guard responseCode == 0 else {
throw KafkaError.rdKafkaError(wrapping: rd_kafka_last_error())
}
}

/// Polls the Kafka client for events.
///
/// Events will cause application-provided callbacks to be called.
Expand Down Expand Up @@ -210,10 +251,19 @@ final class KafkaClient {
}
}

/// Returns `true` if the underlying `librdkafka` consumer is closed.
var isConsumerClosed: Bool {
rd_kafka_consumer_closed(self.kafkaHandle) == 1
}

/// Returns the current out queue length.
///
/// This means the number of producer messages that wait to be sent + the number of any
/// callbacks that are waiting to be executed by invoking `rd_kafka_poll`.
var outgoingQueueSize: Int32 {
return rd_kafka_outq_len(self.kafkaHandle)
}

/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
/// - Warning: Do not escape the pointer from the closure for later use.
/// - Parameter body: The closure will use the Kafka handle pointer.
Expand Down
25 changes: 9 additions & 16 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,15 @@ import Logging
import NIOConcurrencyHelpers
import NIOCore

// MARK: - NoBackPressure

/// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true.
struct NoBackPressure: NIOAsyncSequenceProducerBackPressureStrategy {
func didYield(bufferDepth: Int) -> Bool { true }
func didConsume(bufferDepth: Int) -> Bool { true }
}

// MARK: - ShutDownOnTerminate
// MARK: - KafkaConsumerShutDownOnTerminate

/// `NIOAsyncSequenceProducerDelegate` that terminates the shuts the consumer down when
/// `didTerminate()` is invoked.
struct ShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock
internal struct KafkaConsumerShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock
let stateMachine: NIOLockedValueBox<KafkaConsumer.StateMachine>
}

extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
extension KafkaConsumerShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
func produceMore() {
// No back pressure
return
Expand Down Expand Up @@ -68,7 +60,8 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
/// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
public struct KafkaConsumerMessages: AsyncSequence {
public typealias Element = KafkaConsumerMessage
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, NoBackPressure, ShutdownOnTerminate>
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, BackPressureStrategy, KafkaConsumerShutdownOnTerminate>
let wrappedSequence: WrappedSequence

/// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
Expand All @@ -91,8 +84,8 @@ public struct KafkaConsumerMessages: AsyncSequence {
public final class KafkaConsumer {
typealias Producer = NIOAsyncSequenceProducer<
KafkaConsumerMessage,
NoBackPressure,
ShutdownOnTerminate
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
KafkaConsumerShutdownOnTerminate
>
/// The configuration object of the consumer client.
private var config: KafkaConsumerConfiguration
Expand Down Expand Up @@ -123,8 +116,8 @@ public final class KafkaConsumer {

let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
elementType: KafkaConsumerMessage.self,
backPressureStrategy: NoBackPressure(),
delegate: ShutdownOnTerminate(stateMachine: self.stateMachine)
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
delegate: KafkaConsumerShutdownOnTerminate(stateMachine: self.stateMachine)
)

self.messages = KafkaConsumerMessages(
Expand Down
Loading

0 comments on commit 5ed295f

Please sign in to comment.