Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor KafkaProducer #67

6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ The `send(_:)` method of `KafkaProducer` returns a message-id that can later be
```swift
let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"])

let (producer, acknowledgements) = try await KafkaProducer.makeProducerWithAcknowledgements(
let (producer, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements(
config: config,
logger: .kafkaTest // Your logger here
)
Expand All @@ -44,7 +44,7 @@ await withThrowingTaskGroup(of: Void.self) { group in

// Task receiving acknowledgements
group.addTask {
let messageID = try await producer.send(
let messageID = try producer.send(
KafkaProducerMessage(
topic: "topic-name",
value: "Hello, World!"
Expand All @@ -56,7 +56,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
}

// Required
await producer.shutdownGracefully()
producer.triggerGracefulShutdown()
}
}
```
Expand Down
50 changes: 50 additions & 0 deletions Sources/SwiftKafka/KafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,47 @@ final class KafkaClient {
rd_kafka_destroy(kafkaHandle)
}

/// Produce a message to the Kafka cluster.
///
/// - Parameter message: The ``KafkaProducerMessage`` that is sent to the KafkaCluster.
/// - Parameter newMessageID: ID that was assigned to the `message`.
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
/// - Parameter topicHandles: Topic handles that this client uses to produce new messages
func produce(
message: KafkaProducerMessage,
newMessageID: UInt,
topicConfig: KafkaTopicConfiguration,
topicHandles: RDKafkaTopicHandles
) throws {
let keyBytes: [UInt8]?
if var key = message.key {
keyBytes = key.readBytes(length: key.readableBytes)
} else {
keyBytes = nil
}

let responseCode = try message.value.withUnsafeReadableBytes { valueBuffer in
return try topicHandles.withTopicHandlePointer(topic: message.topic, topicConfig: topicConfig) { topicHandle in
// Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster.
// Returns 0 on success, error code otherwise.
return rd_kafka_produce(
topicHandle,
message.partition.rawValue,
RD_KAFKA_MSG_F_COPY,
UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress),
valueBuffer.count,
keyBytes,
keyBytes?.count ?? 0,
UnsafeMutableRawPointer(bitPattern: newMessageID)
)
}
}

guard responseCode == 0 else {
throw KafkaError.rdKafkaError(wrapping: rd_kafka_last_error())
}
}

/// Polls the Kafka client for events.
///
/// Events will cause application-provided callbacks to be called.
Expand Down Expand Up @@ -210,10 +251,19 @@ final class KafkaClient {
}
}

/// Returns `true` if the underlying `librdkafka` consumer is closed.
var isConsumerClosed: Bool {
rd_kafka_consumer_closed(self.kafkaHandle) == 1
}

/// Returns the current out queue length.
///
/// This means the number of producer messages that wait to be sent + the number of any
/// callbacks that are waiting to be executed by invoking `rd_kafka_poll`.
var outgoingQueueSize: Int32 {
return rd_kafka_outq_len(self.kafkaHandle)
}

/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
/// - Warning: Do not escape the pointer from the closure for later use.
/// - Parameter body: The closure will use the Kafka handle pointer.
Expand Down
25 changes: 9 additions & 16 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,15 @@ import Logging
import NIOConcurrencyHelpers
import NIOCore

// MARK: - NoBackPressure

/// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true.
struct NoBackPressure: NIOAsyncSequenceProducerBackPressureStrategy {
func didYield(bufferDepth: Int) -> Bool { true }
func didConsume(bufferDepth: Int) -> Bool { true }
}

// MARK: - ShutDownOnTerminate
// MARK: - KafkaConsumerShutDownOnTerminate

/// `NIOAsyncSequenceProducerDelegate` that terminates the shuts the consumer down when
/// `didTerminate()` is invoked.
struct ShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock
internal struct KafkaConsumerShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock
let stateMachine: NIOLockedValueBox<KafkaConsumer.StateMachine>
}

extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
extension KafkaConsumerShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
func produceMore() {
// No back pressure
return
Expand Down Expand Up @@ -68,7 +60,8 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
/// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
public struct KafkaConsumerMessages: AsyncSequence {
public typealias Element = KafkaConsumerMessage
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, NoBackPressure, ShutdownOnTerminate>
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, BackPressureStrategy, KafkaConsumerShutdownOnTerminate>
let wrappedSequence: WrappedSequence

/// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
Expand All @@ -91,8 +84,8 @@ public struct KafkaConsumerMessages: AsyncSequence {
public final class KafkaConsumer {
typealias Producer = NIOAsyncSequenceProducer<
KafkaConsumerMessage,
NoBackPressure,
ShutdownOnTerminate
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
KafkaConsumerShutdownOnTerminate
>
/// The configuration object of the consumer client.
private var config: KafkaConsumerConfiguration
Expand Down Expand Up @@ -123,8 +116,8 @@ public final class KafkaConsumer {

let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
elementType: KafkaConsumerMessage.self,
backPressureStrategy: NoBackPressure(),
delegate: ShutdownOnTerminate(stateMachine: self.stateMachine)
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
delegate: KafkaConsumerShutdownOnTerminate(stateMachine: self.stateMachine)
)

self.messages = KafkaConsumerMessages(
Expand Down
Loading