diff --git a/Sources/SwiftKafka/KafkaClient.swift b/Sources/SwiftKafka/KafkaClient.swift index 5dca295b..ef3cd330 100644 --- a/Sources/SwiftKafka/KafkaClient.swift +++ b/Sources/SwiftKafka/KafkaClient.swift @@ -42,6 +42,47 @@ final class KafkaClient { rd_kafka_destroy(kafkaHandle) } + /// Produce a message to the Kafka cluster. + /// + /// - Parameter message: The ``KafkaProducerMessage`` that is sent to the KafkaCluster. + /// - Parameter newMessageID: ID that was assigned to the `message`. + /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics. + /// - Parameter topicHandles: Topic handles that this client uses to produce new messages + func produce( + message: KafkaProducerMessage, + newMessageID: UInt, + topicConfig: KafkaTopicConfiguration, + topicHandles: RDKafkaTopicHandles + ) throws { + let keyBytes: [UInt8]? + if var key = message.key { + keyBytes = key.readBytes(length: key.readableBytes) + } else { + keyBytes = nil + } + + let responseCode = try message.value.withUnsafeReadableBytes { valueBuffer in + return try topicHandles.withTopicHandlePointer(topic: message.topic, topicConfig: topicConfig) { topicHandle in + // Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster. + // Returns 0 on success, error code otherwise. + return rd_kafka_produce( + topicHandle, + message.partition.rawValue, + RD_KAFKA_MSG_F_COPY, + UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress), + valueBuffer.count, + keyBytes, + keyBytes?.count ?? 0, + UnsafeMutableRawPointer(bitPattern: newMessageID) + ) + } + } + + guard responseCode == 0 else { + throw KafkaError.rdKafkaError(wrapping: rd_kafka_last_error()) + } + } + /// Polls the Kafka client for events. /// /// Events will cause application-provided callbacks to be called. diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift index 42f6b22f..c534e13a 100644 --- a/Sources/SwiftKafka/KafkaProducer.swift +++ b/Sources/SwiftKafka/KafkaProducer.swift @@ -34,11 +34,10 @@ extension KafkaProducerShutdownOnTerminate: NIOAsyncSequenceProducerDelegate { func didTerminate() { let action = self.stateMachine.withLockedValue { $0.finish() } switch action { - case .shutdownGracefullyAndFinishSource(let client, let source, let topicHandles): + case .shutdownGracefullyAndFinishSource(let client, let source): Task { await KafkaProducer._shutDownGracefully( client: client, - topicHandles: topicHandles, source: source, timeout: 10000 ) @@ -215,10 +214,9 @@ public final class KafkaProducer { public func shutdownGracefully(timeout: Int32 = 10000) async { let action = self.stateMachine.withLockedValue { $0.finish() } switch action { - case .shutdownGracefullyAndFinishSource(let client, let source, let topicHandles): + case .shutdownGracefullyAndFinishSource(let client, let source): await KafkaProducer._shutDownGracefully( client: client, - topicHandles: topicHandles, source: source, timeout: timeout ) @@ -230,7 +228,6 @@ public final class KafkaProducer { // Static so we perform this without needing a reference to `KafkaProducer` static func _shutDownGracefully( client: KafkaClient, - topicHandles: [String: OpaquePointer], source: Producer.Source?, timeout: Int32 ) async { @@ -243,10 +240,6 @@ public final class KafkaProducer { continuation.resume() } } - - for (_, topicHandle) in topicHandles { - rd_kafka_topic_destroy(topicHandle) - } } /// Start polling Kafka for acknowledged messages. @@ -277,76 +270,16 @@ public final class KafkaProducer { public func send(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID { let action = try self.stateMachine.withLockedValue { try $0.send() } switch action { - case .send(let client, let newMessageID): - try self._send( - client: client, + case .send(let client, let newMessageID, let topicHandles): + try client.produce( message: message, - newMessageID: newMessageID + newMessageID: newMessageID, + topicConfig: self.topicConfig, + topicHandles: topicHandles ) return KafkaProducerMessageID(rawValue: newMessageID) } } - - private func _send( - client: KafkaClient, - message: KafkaProducerMessage, - newMessageID: UInt - ) throws { - let topicHandle = try self._createTopicHandleIfNeeded(topic: message.topic) - - let keyBytes: [UInt8]? - if var key = message.key { - keyBytes = key.readBytes(length: key.readableBytes) - } else { - keyBytes = nil - } - - let responseCode = message.value.withUnsafeReadableBytes { valueBuffer in - - // Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster. - // Returns 0 on success, error code otherwise. - return rd_kafka_produce( - topicHandle, - message.partition.rawValue, - RD_KAFKA_MSG_F_COPY, - UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress), - valueBuffer.count, - keyBytes, - keyBytes?.count ?? 0, - UnsafeMutableRawPointer(bitPattern: newMessageID) - ) - } - - guard responseCode == 0 else { - throw KafkaError.rdKafkaError(wrapping: rd_kafka_last_error()) - } - } - - /// Check `topicHandles` for a handle matching the topic name and create a new handle if needed. - /// - Parameter topic: The name of the topic that is addressed. - private func _createTopicHandleIfNeeded(topic: String) throws -> OpaquePointer? { - try self.stateMachine.withLockedValue { state in - let action = try state.createTopicHandleIfNeeded(topic: topic) - switch action { - case .handleExists(let handle): - return handle - case .createTopicHandle(let client, let topic): - let newHandle = try client.withKafkaHandlePointer { handle in - let rdTopicConf = try RDKafkaTopicConfig.createFrom(topicConfig: self.topicConfig) - return rd_kafka_topic_new( - handle, - topic, - rdTopicConf - ) - // rd_kafka_topic_new deallocates topic config object - } - if let newHandle { - try state.addTopicHandle(topic: topic, handle: newHandle) - } - return newHandle - } - } - } } // MARK: - KafkaProducer + StateMachine @@ -367,12 +300,12 @@ extension KafkaProducer { /// - Parameter messageIDCounter:Used to incrementally assign unique IDs to messages. /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - /// - Parameter topicHandles: Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer. + /// - Parameter topicHandles: Class containing all topic names with their respective `rd_kafka_topic_t` pointer. case started( client: KafkaClient, messageIDCounter: UInt, source: Producer.Source?, - topicHandles: [String: OpaquePointer] + topicHandles: RDKafkaTopicHandles ) /// The ``KafkaProducer`` has been shut down and cannot be used anymore. case finished @@ -394,7 +327,7 @@ extension KafkaProducer { client: client, messageIDCounter: 0, source: source, - topicHandles: [:] + topicHandles: RDKafkaTopicHandles(client: client) ) } @@ -421,55 +354,6 @@ extension KafkaProducer { } } - /// Action to take when wanting to create a topic handle. - enum CreateTopicHandleAction { - /// Do create a new topic handle. - case createTopicHandle( - client: KafkaClient, - topic: String - ) - /// No need to create a new handle. It exists already: `handle`. - case handleExists(handle: OpaquePointer) - } - - /// Returns action to be taken when wanting to create a new topic handle. - func createTopicHandleIfNeeded(topic: String) throws -> CreateTopicHandleAction { - switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .started(let client, _, _, let topicHandles): - if let handle = topicHandles[topic] { - return .handleExists(handle: handle) - } else { - return .createTopicHandle(client: client, topic: topic) - } - case .finished: - throw KafkaError.connectionClosed(reason: "Tried to create topic handle on closed connection") - } - } - - /// Add a newly created topic handle to the list of topic handles contained in the state machine. - mutating func addTopicHandle( - topic: String, - handle: OpaquePointer - ) throws { - switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .started(let client, let messageIDCounter, let source, let topicHandles): - var topicHandles = topicHandles - topicHandles[topic] = handle - self.state = .started( - client: client, - messageIDCounter: messageIDCounter, - source: source, - topicHandles: topicHandles - ) - case .finished: - throw KafkaError.connectionClosed(reason: "Tried to create topic handle on closed connection") - } - } - /// Action to be taken when wanting to send a message. enum SendAction { /// Send the message. @@ -477,7 +361,8 @@ extension KafkaProducer { /// - Important: `newMessageID` is the new message ID assigned to the message to be sent. case send( client: KafkaClient, - newMessageID: UInt + newMessageID: UInt, + topicHandles: RDKafkaTopicHandles ) } @@ -498,7 +383,8 @@ extension KafkaProducer { ) return .send( client: client, - newMessageID: newMessageID + newMessageID: newMessageID, + topicHandles: topicHandles ) case .finished: throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer") @@ -511,11 +397,9 @@ extension KafkaProducer { /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - /// - Parameter topicHandles: Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer. case shutdownGracefullyAndFinishSource( client: KafkaClient, - source: Producer.Source?, - topicHandles: [String: OpaquePointer] + source: Producer.Source? ) } @@ -527,12 +411,11 @@ extension KafkaProducer { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") - case .started(let client, _, let source, let topicHandles): + case .started(let client, _, let source, _): self.state = .finished return .shutdownGracefullyAndFinishSource( client: client, - source: source, - topicHandles: topicHandles + source: source ) case .finished: return nil diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaTopicHandles.swift b/Sources/SwiftKafka/RDKafka/RDKafkaTopicHandles.swift new file mode 100644 index 00000000..8e213278 --- /dev/null +++ b/Sources/SwiftKafka/RDKafka/RDKafkaTopicHandles.swift @@ -0,0 +1,79 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-gsoc open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Crdkafka + +/// Swift class that matches topic names with their respective `rd_kafka_topic_t` handles. +internal class RDKafkaTopicHandles { + private var _internal: [String: OpaquePointer] + + // Note: we retain the client to ensure it does not get + // deinitialized before rd_kafka_topic_destroy() is invoked (required) + private let client: KafkaClient + + init(client: KafkaClient) { + self._internal = [:] + self.client = client + } + + deinit { + for (_, topicHandle) in self._internal { + rd_kafka_topic_destroy(topicHandle) + } + } + + /// Scoped accessor that enables safe access to the pointer of the topic's handle. + /// - Warning: Do not escape the pointer from the closure for later use. + /// - Parameter topic: The name of the topic that is addressed. + /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics. + /// - Parameter body: The closure will use the topic handle pointer. + @discardableResult + func withTopicHandlePointer( + topic: String, + topicConfig: KafkaTopicConfiguration, + _ body: (OpaquePointer) throws -> T + ) throws -> T { + let topicHandle = try self.createTopicHandleIfNeeded(topic: topic, topicConfig: topicConfig) + return try body(topicHandle) + } + + /// Check `topicHandles` for a handle matching the topic name and create a new handle if needed. + /// - Parameter topic: The name of the topic that is addressed. + private func createTopicHandleIfNeeded( + topic: String, + topicConfig: KafkaTopicConfiguration + ) throws -> OpaquePointer { + if let handle = self._internal[topic] { + return handle + } else { + let rdTopicConf = try RDKafkaTopicConfig.createFrom(topicConfig: topicConfig) + let newHandle = self.client.withKafkaHandlePointer { kafkaHandle in + rd_kafka_topic_new( + kafkaHandle, + topic, + rdTopicConf + ) + // rd_kafka_topic_new deallocates topic config object + } + + guard let newHandle else { + // newHandle is nil, so we can retrieve error through rd_kafka_last_error() + let error = KafkaError.rdKafkaError(wrapping: rd_kafka_last_error()) + throw error + } + self._internal[topic] = newHandle + return newHandle + } + } +}