Skip to content

Commit

Permalink
Create wrapper for Kafka topic handle dict
Browse files Browse the repository at this point in the history
Modifications:

* create new class `RDKafkaTopicHandles` that wraps a dictionary
  containing all topic names with their respective `rd_kafka_topic_t` handles
* create method `KafkaClient.produce` wrapping the `rd_kafka_produce`
  method in a Swift way
  • Loading branch information
felixschlegel committed Jun 29, 2023
1 parent ab68bae commit ce6a38e
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 134 deletions.
41 changes: 41 additions & 0 deletions Sources/SwiftKafka/KafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,47 @@ final class KafkaClient {
rd_kafka_destroy(kafkaHandle)
}

/// Produce a message to the Kafka cluster.
///
/// - Parameter message: The ``KafkaProducerMessage`` that is sent to the KafkaCluster.
/// - Parameter newMessageID: ID that was assigned to the `message`.
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
/// - Parameter topicHandles: Topic handles that this client uses to produce new messages
func produce(
message: KafkaProducerMessage,
newMessageID: UInt,
topicConfig: KafkaTopicConfiguration,
topicHandles: RDKafkaTopicHandles
) throws {
let keyBytes: [UInt8]?
if var key = message.key {
keyBytes = key.readBytes(length: key.readableBytes)
} else {
keyBytes = nil
}

let responseCode = try message.value.withUnsafeReadableBytes { valueBuffer in
return try topicHandles.withTopicHandlePointer(topic: message.topic, topicConfig: topicConfig) { topicHandle in
// Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster.
// Returns 0 on success, error code otherwise.
return rd_kafka_produce(
topicHandle,
message.partition.rawValue,
RD_KAFKA_MSG_F_COPY,
UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress),
valueBuffer.count,
keyBytes,
keyBytes?.count ?? 0,
UnsafeMutableRawPointer(bitPattern: newMessageID)
)
}
}

guard responseCode == 0 else {
throw KafkaError.rdKafkaError(wrapping: rd_kafka_last_error())
}
}

/// Polls the Kafka client for events.
///
/// Events will cause application-provided callbacks to be called.
Expand Down
151 changes: 17 additions & 134 deletions Sources/SwiftKafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ extension KafkaProducerShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
func didTerminate() {
let action = self.stateMachine.withLockedValue { $0.finish() }
switch action {
case .shutdownGracefullyAndFinishSource(let client, let source, let topicHandles):
case .shutdownGracefullyAndFinishSource(let client, let source):
Task {
await KafkaProducer._shutDownGracefully(
client: client,
topicHandles: topicHandles,
source: source,
timeout: 10000
)
Expand Down Expand Up @@ -215,10 +214,9 @@ public final class KafkaProducer {
public func shutdownGracefully(timeout: Int32 = 10000) async {
let action = self.stateMachine.withLockedValue { $0.finish() }
switch action {
case .shutdownGracefullyAndFinishSource(let client, let source, let topicHandles):
case .shutdownGracefullyAndFinishSource(let client, let source):
await KafkaProducer._shutDownGracefully(
client: client,
topicHandles: topicHandles,
source: source,
timeout: timeout
)
Expand All @@ -230,7 +228,6 @@ public final class KafkaProducer {
// Static so we perform this without needing a reference to `KafkaProducer`
static func _shutDownGracefully(
client: KafkaClient,
topicHandles: [String: OpaquePointer],
source: Producer.Source?,
timeout: Int32
) async {
Expand All @@ -243,10 +240,6 @@ public final class KafkaProducer {
continuation.resume()
}
}

for (_, topicHandle) in topicHandles {
rd_kafka_topic_destroy(topicHandle)
}
}

/// Start polling Kafka for acknowledged messages.
Expand Down Expand Up @@ -277,76 +270,16 @@ public final class KafkaProducer {
public func send(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID {
let action = try self.stateMachine.withLockedValue { try $0.send() }
switch action {
case .send(let client, let newMessageID):
try self._send(
client: client,
case .send(let client, let newMessageID, let topicHandles):
try client.produce(
message: message,
newMessageID: newMessageID
newMessageID: newMessageID,
topicConfig: self.topicConfig,
topicHandles: topicHandles
)
return KafkaProducerMessageID(rawValue: newMessageID)
}
}

private func _send(
client: KafkaClient,
message: KafkaProducerMessage,
newMessageID: UInt
) throws {
let topicHandle = try self._createTopicHandleIfNeeded(topic: message.topic)

let keyBytes: [UInt8]?
if var key = message.key {
keyBytes = key.readBytes(length: key.readableBytes)
} else {
keyBytes = nil
}

let responseCode = message.value.withUnsafeReadableBytes { valueBuffer in

// Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster.
// Returns 0 on success, error code otherwise.
return rd_kafka_produce(
topicHandle,
message.partition.rawValue,
RD_KAFKA_MSG_F_COPY,
UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress),
valueBuffer.count,
keyBytes,
keyBytes?.count ?? 0,
UnsafeMutableRawPointer(bitPattern: newMessageID)
)
}

guard responseCode == 0 else {
throw KafkaError.rdKafkaError(wrapping: rd_kafka_last_error())
}
}

/// Check `topicHandles` for a handle matching the topic name and create a new handle if needed.
/// - Parameter topic: The name of the topic that is addressed.
private func _createTopicHandleIfNeeded(topic: String) throws -> OpaquePointer? {
try self.stateMachine.withLockedValue { state in
let action = try state.createTopicHandleIfNeeded(topic: topic)
switch action {
case .handleExists(let handle):
return handle
case .createTopicHandle(let client, let topic):
let newHandle = try client.withKafkaHandlePointer { handle in
let rdTopicConf = try RDKafkaTopicConfig.createFrom(topicConfig: self.topicConfig)
return rd_kafka_topic_new(
handle,
topic,
rdTopicConf
)
// rd_kafka_topic_new deallocates topic config object
}
if let newHandle {
try state.addTopicHandle(topic: topic, handle: newHandle)
}
return newHandle
}
}
}
}

// MARK: - KafkaProducer + StateMachine
Expand All @@ -367,12 +300,12 @@ extension KafkaProducer {
/// - Parameter messageIDCounter:Used to incrementally assign unique IDs to messages.
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
/// - Parameter topicHandles: Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer.
/// - Parameter topicHandles: Class containing all topic names with their respective `rd_kafka_topic_t` pointer.
case started(
client: KafkaClient,
messageIDCounter: UInt,
source: Producer.Source?,
topicHandles: [String: OpaquePointer]
topicHandles: RDKafkaTopicHandles
)
/// The ``KafkaProducer`` has been shut down and cannot be used anymore.
case finished
Expand All @@ -394,7 +327,7 @@ extension KafkaProducer {
client: client,
messageIDCounter: 0,
source: source,
topicHandles: [:]
topicHandles: RDKafkaTopicHandles(client: client)
)
}

Expand All @@ -421,63 +354,15 @@ extension KafkaProducer {
}
}

/// Action to take when wanting to create a topic handle.
enum CreateTopicHandleAction {
/// Do create a new topic handle.
case createTopicHandle(
client: KafkaClient,
topic: String
)
/// No need to create a new handle. It exists already: `handle`.
case handleExists(handle: OpaquePointer)
}

/// Returns action to be taken when wanting to create a new topic handle.
func createTopicHandleIfNeeded(topic: String) throws -> CreateTopicHandleAction {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .started(let client, _, _, let topicHandles):
if let handle = topicHandles[topic] {
return .handleExists(handle: handle)
} else {
return .createTopicHandle(client: client, topic: topic)
}
case .finished:
throw KafkaError.connectionClosed(reason: "Tried to create topic handle on closed connection")
}
}

/// Add a newly created topic handle to the list of topic handles contained in the state machine.
mutating func addTopicHandle(
topic: String,
handle: OpaquePointer
) throws {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .started(let client, let messageIDCounter, let source, let topicHandles):
var topicHandles = topicHandles
topicHandles[topic] = handle
self.state = .started(
client: client,
messageIDCounter: messageIDCounter,
source: source,
topicHandles: topicHandles
)
case .finished:
throw KafkaError.connectionClosed(reason: "Tried to create topic handle on closed connection")
}
}

/// Action to be taken when wanting to send a message.
enum SendAction {
/// Send the message.
///
/// - Important: `newMessageID` is the new message ID assigned to the message to be sent.
case send(
client: KafkaClient,
newMessageID: UInt
newMessageID: UInt,
topicHandles: RDKafkaTopicHandles
)
}

Expand All @@ -498,7 +383,8 @@ extension KafkaProducer {
)
return .send(
client: client,
newMessageID: newMessageID
newMessageID: newMessageID,
topicHandles: topicHandles
)
case .finished:
throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer")
Expand All @@ -511,11 +397,9 @@ extension KafkaProducer {
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
/// - Parameter topicHandles: Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer.
case shutdownGracefullyAndFinishSource(
client: KafkaClient,
source: Producer.Source?,
topicHandles: [String: OpaquePointer]
source: Producer.Source?
)
}

Expand All @@ -527,12 +411,11 @@ extension KafkaProducer {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .started(let client, _, let source, let topicHandles):
case .started(let client, _, let source, _):
self.state = .finished
return .shutdownGracefullyAndFinishSource(
client: client,
source: source,
topicHandles: topicHandles
source: source
)
case .finished:
return nil
Expand Down
79 changes: 79 additions & 0 deletions Sources/SwiftKafka/RDKafka/RDKafkaTopicHandles.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-gsoc open source project
//
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Crdkafka

/// Swift class that matches topic names with their respective `rd_kafka_topic_t` handles.
internal class RDKafkaTopicHandles {
private var _internal: [String: OpaquePointer]

// Note: we retain the client to ensure it does not get
// deinitialized before rd_kafka_topic_destroy() is invoked (required)
private let client: KafkaClient

init(client: KafkaClient) {
self._internal = [:]
self.client = client
}

deinit {
for (_, topicHandle) in self._internal {
rd_kafka_topic_destroy(topicHandle)
}
}

/// Scoped accessor that enables safe access to the pointer of the topic's handle.
/// - Warning: Do not escape the pointer from the closure for later use.
/// - Parameter topic: The name of the topic that is addressed.
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
/// - Parameter body: The closure will use the topic handle pointer.
@discardableResult
func withTopicHandlePointer<T>(
topic: String,
topicConfig: KafkaTopicConfiguration,
_ body: (OpaquePointer) throws -> T
) throws -> T {
let topicHandle = try self.createTopicHandleIfNeeded(topic: topic, topicConfig: topicConfig)
return try body(topicHandle)
}

/// Check `topicHandles` for a handle matching the topic name and create a new handle if needed.
/// - Parameter topic: The name of the topic that is addressed.
private func createTopicHandleIfNeeded(
topic: String,
topicConfig: KafkaTopicConfiguration
) throws -> OpaquePointer {
if let handle = self._internal[topic] {
return handle
} else {
let rdTopicConf = try RDKafkaTopicConfig.createFrom(topicConfig: topicConfig)
let newHandle = self.client.withKafkaHandlePointer { kafkaHandle in
rd_kafka_topic_new(
kafkaHandle,
topic,
rdTopicConf
)
// rd_kafka_topic_new deallocates topic config object
}

guard let newHandle else {
// newHandle is nil, so we can retrieve error through rd_kafka_last_error()
let error = KafkaError.rdKafkaError(wrapping: rd_kafka_last_error())
throw error
}
self._internal[topic] = newHandle
return newHandle
}
}
}

0 comments on commit ce6a38e

Please sign in to comment.