diff --git a/Package.swift b/Package.swift index 3f035d11..c9154d82 100644 --- a/Package.swift +++ b/Package.swift @@ -29,10 +29,10 @@ let rdkafkaExclude = [ let package = Package( name: "swift-kafka-gsoc", platforms: [ - .macOS(.v10_15), - .iOS(.v13), - .watchOS(.v6), - .tvOS(.v13), + .macOS(.v13), + .iOS(.v16), + .watchOS(.v9), + .tvOS(.v16), ], products: [ .library( diff --git a/README.md b/README.md index 5171dc46..425f01d2 100644 --- a/README.md +++ b/README.md @@ -11,24 +11,35 @@ The `sendAsync(_:)` method of `KafkaProducer` returns a message-id that can late ```swift let config = KafkaProducerConfig(bootstrapServers: ["localhost:9092"]) -let producer = try await KafkaProducer( +let (producer, acknowledgements) = try await KafkaProducer.makeProducerWithAcknowledgements( config: config, logger: .kafkaTest // Your logger here ) -let messageID = try await producer.sendAsync( - KafkaProducerMessage( - topic: "topic-name", - value: "Hello, World!" - ) -) +await withThrowingTaskGroup(of: Void.self) { group in -for await acknowledgement in producer.acknowledgements { - // Check if acknowledgement belongs to the sent message -} + // Run Task + group.addTask { + try await producer.run() + } -// Required -await producer.shutdownGracefully() + // Task receiving acknowledgements + group.addTask { + let messageID = try await producer.sendAsync( + KafkaProducerMessage( + topic: "topic-name", + value: "Hello, World!" + ) + ) + + for await acknowledgement in acknowledgements { + // Check if acknowledgement belongs to the sent message + } + + // Required + await producer.shutdownGracefully() + } +} ``` ### Consumer API diff --git a/Sources/SwiftKafka/KafkaClient.swift b/Sources/SwiftKafka/KafkaClient.swift index e01059cd..bfca9e7b 100644 --- a/Sources/SwiftKafka/KafkaClient.swift +++ b/Sources/SwiftKafka/KafkaClient.swift @@ -42,6 +42,20 @@ final class KafkaClient { rd_kafka_destroy(kafkaHandle) } + /// Polls the Kafka client for events. + /// + /// Events will cause application-provided callbacks to be called. + /// + /// - Parameter timeout: Specifies the maximum amount of time + /// (in milliseconds) that the call will block waiting for events. + /// For non-blocking calls, provide 0 as `timeout`. + /// To wait indefinitely for an event, provide -1. + /// - Returns: The number of events served. + @discardableResult + func poll(timeout: Int32) -> Int32 { + return rd_kafka_poll(self.kafkaHandle, timeout) + } + /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle. /// - Warning: Do not escape the pointer from the closure for later use. /// - Parameter body: The closure will use the Kafka handle pointer. diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index 5c450728..cb226406 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -121,6 +121,13 @@ public final class KafkaConsumer { highWatermark: highWatermark ) + // (NIOAsyncSequenceProducer.makeSequence Documentation Excerpt) + // This method returns a struct containing a NIOAsyncSequenceProducer.Source and a NIOAsyncSequenceProducer. + // The source MUST be held by the caller and used to signal new elements or finish. + // The sequence MUST be passed to the actual consumer and MUST NOT be held by the caller. + // This is due to the fact that deiniting the sequence is used as part of a trigger to + // terminate the underlying source. + // TODO: make self delegate to avoid weak reference here let messagesSequenceDelegate = ConsumerMessagesAsyncSequenceDelegate { [weak self] in self?.produceMore() } didTerminateClosure: { [weak self] in diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift index 64a64483..0efc6c95 100644 --- a/Sources/SwiftKafka/KafkaProducer.swift +++ b/Sources/SwiftKafka/KafkaProducer.swift @@ -16,27 +16,15 @@ import Crdkafka import Logging import NIOCore -/// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true. -struct NoBackPressure: NIOAsyncSequenceProducerBackPressureStrategy { - func didYield(bufferDepth: Int) -> Bool { true } - func didConsume(bufferDepth: Int) -> Bool { true } -} - -/// `NIOAsyncSequenceProducerDelegate` that does nothing. -struct NoDelegate: NIOAsyncSequenceProducerDelegate { - func produceMore() {} - func didTerminate() {} -} - /// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``). -public struct AcknowledgedMessagesAsyncSequence: AsyncSequence { +public struct KafkaMessageAcknowledgements: AsyncSequence { public typealias Element = Result - typealias WrappedSequence = NIOAsyncSequenceProducer + typealias WrappedSequence = AsyncStream let wrappedSequence: WrappedSequence /// `AsynceIteratorProtocol` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``). public struct AcknowledgedMessagesAsyncIterator: AsyncIteratorProtocol { - let wrappedIterator: NIOAsyncSequenceProducer.AsyncIterator + var wrappedIterator: AsyncStream.AsyncIterator public mutating func next() async -> Element? { await self.wrappedIterator.next() @@ -77,65 +65,110 @@ public actor KafkaProducer { /// Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer. private var topicHandles: [String: OpaquePointer] - // We use implicitly unwrapped optionals here as these properties need to access self upon initialization /// Used for handling the connection to the Kafka cluster. - private var client: KafkaClient! - /// Task that polls the Kafka cluster for updates periodically. - private var pollTask: Task! - - /// `AsyncSequence` that returns all ``KafkaProducerMessage`` objects that have been - /// acknowledged by the Kafka cluster. - public nonisolated let acknowledgements: AcknowledgedMessagesAsyncSequence - nonisolated let acknowlegdementsSource: AcknowledgedMessagesAsyncSequence.WrappedSequence.Source - private typealias Acknowledgement = Result + private let client: KafkaClient + // Private initializer, use factory methods to create KafkaProducer /// Initialize a new ``KafkaProducer``. /// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``. /// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics. /// - Parameter logger: A logger. - /// - Throws: A ``KafkaError`` if the received message is an error message or malformed. - public init( - config: KafkaProducerConfig = KafkaProducerConfig(), - topicConfig: KafkaTopicConfig = KafkaTopicConfig(), + /// - Throws: A ``KafkaError`` if initializing the producer failed. + private init( + client: KafkaClient, + topicConfig: KafkaTopicConfig, logger: Logger ) async throws { + self.client = client self.topicConfig = topicConfig - self.logger = logger self.topicHandles = [:] + self.logger = logger self.state = .started + } - // (NIOAsyncSequenceProducer.makeSequence Documentation Excerpt) - // This method returns a struct containing a NIOAsyncSequenceProducer.Source and a NIOAsyncSequenceProducer. - // The source MUST be held by the caller and used to signal new elements or finish. - // The sequence MUST be passed to the actual consumer and MUST NOT be held by the caller. - // This is due to the fact that deiniting the sequence is used as part of a trigger to - // terminate the underlying source. - let acknowledgementsSourceAndSequence = NIOAsyncSequenceProducer.makeSequence( - elementType: Acknowledgement.self, - backPressureStrategy: NoBackPressure(), - delegate: NoDelegate() + /// Initialize a new ``KafkaProducer``. + /// + /// This factory method creates a producer without message acknowledgements. + /// + /// - Parameter configuration: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``. + /// - Parameter topicConfiguration: The ``KafkaTopicConfig`` used for newly created topics. + /// - Parameter logger: A logger. + /// - Returns: The newly created ``KafkaProducer``. + /// - Throws: A ``KafkaError`` if initializing the producer failed. + public static func makeProducer( + config: KafkaProducerConfig = KafkaProducerConfig(), + topicConfig: KafkaTopicConfig = KafkaTopicConfig(), + logger: Logger + ) async throws -> KafkaProducer { + let client = try RDKafka.createClient( + type: .producer, + configDictionary: config.dictionary, + // Having no callback will discard any incoming acknowledgement messages + // Ref: rdkafka_broker.c:rd_kafka_dr_msgq + callback: nil, + logger: logger ) - self.acknowlegdementsSource = acknowledgementsSourceAndSequence.source - self.acknowledgements = AcknowledgedMessagesAsyncSequence( - wrappedSequence: acknowledgementsSourceAndSequence.sequence + + let producer = try await KafkaProducer( + client: client, + topicConfig: topicConfig, + logger: logger ) - self.client = try RDKafka.createClient( + return producer + } + + /// Initialize a new ``KafkaProducer`` and a ``KafkaMessageAcknowledgements`` asynchronous sequence. + /// + /// Use the asynchronous sequence to consume message acknowledgements. + /// + /// - Important: When the asynchronous sequence is deinited the producer will be shutdown. + /// + /// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``. + /// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics. + /// - Parameter logger: A logger. + /// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements`` + /// `AsyncSequence` used for receiving message acknowledgements. + /// - Throws: A ``KafkaError`` if initializing the producer failed. + public static func makeProducerWithAcknowledgements( + config: KafkaProducerConfig = KafkaProducerConfig(), + topicConfig: KafkaTopicConfig = KafkaTopicConfig(), + logger: Logger + ) async throws -> (KafkaProducer, KafkaMessageAcknowledgements) { + var streamContinuation: AsyncStream>.Continuation? + let stream = AsyncStream { continuation in + streamContinuation = continuation + } + + let client = try RDKafka.createClient( type: .producer, configDictionary: config.dictionary, - callback: self.deliveryReportCallback, - logger: self.logger + callback: { [logger, streamContinuation] messageResult in + guard let messageResult else { + logger.error("Could not resolve acknowledged message") + return + } + + // Ignore YieldResult as we don't support back pressure in KafkaProducer + streamContinuation?.yield(messageResult) + }, + logger: logger ) - // Poll Kafka every millisecond - self.pollTask = Task { [client] in - while !Task.isCancelled { - client?.withKafkaHandlePointer { handle in - rd_kafka_poll(handle, 0) - } - try? await Task.sleep(nanoseconds: 1_000_000) + let producer = try await KafkaProducer( + client: client, + topicConfig: topicConfig, + logger: logger + ) + + streamContinuation?.onTermination = { [producer] _ in + Task { + await producer.shutdownGracefully() } } + + let acknowlegementsSequence = KafkaMessageAcknowledgements(wrappedSequence: stream) + return (producer, acknowlegementsSequence) } /// Method to shutdown the ``KafkaProducer``. @@ -155,7 +188,7 @@ public actor KafkaProducer { private func _shutDownGracefully(timeout: Int32) async { await withCheckedContinuation { (continuation: CheckedContinuation) in - // Wait 10 seconds for outstanding messages to be sent and callbacks to be called + // Wait `timeout` seconds for outstanding messages to be sent and callbacks to be called self.client.withKafkaHandlePointer { handle in rd_kafka_flush(handle, timeout) continuation.resume() @@ -165,11 +198,22 @@ public actor KafkaProducer { for (_, topicHandle) in self.topicHandles { rd_kafka_topic_destroy(topicHandle) } - self.pollTask.cancel() self.state = .shutDown } + /// Start polling Kafka for acknowledged messages. + /// + /// - Parameter pollInterval: The desired time interval between two consecutive polls. + /// - Returns: An awaitable task representing the execution of the poll loop. + public func run(pollInterval: Duration = .milliseconds(100)) async throws { + // TODO(felix): make pollInterval part of config -> easier to adapt to Service protocol (service-lifecycle) + while self.state == .started { + self.client.poll(timeout: 0) + try await Task.sleep(for: pollInterval) + } + } + /// Send messages to the Kafka cluster asynchronously, aka "fire and forget". /// This function is non-blocking. /// - Parameter message: The ``KafkaProducerMessage`` that is sent to the KafkaCluster. @@ -220,29 +264,6 @@ public actor KafkaProducer { return self.messageIDCounter } - // Closure that is executed when a message has been acknowledged by Kafka - private lazy var deliveryReportCallback: (UnsafePointer?) -> Void = { [logger, acknowlegdementsSource] messagePointer in - guard let messagePointer = messagePointer else { - logger.error("Could not resolve acknowledged message") - return - } - - let messageID = UInt(bitPattern: messagePointer.pointee._private) - - do { - let message = try KafkaAcknowledgedMessage(messagePointer: messagePointer, id: messageID) - _ = acknowlegdementsSource.yield(.success(message)) - } catch { - guard let error = error as? KafkaAcknowledgedMessageError else { - fatalError("Caught error that is not of type \(KafkaAcknowledgedMessageError.self)") - } - _ = acknowlegdementsSource.yield(.failure(error)) - } - - // The messagePointer is automatically destroyed by librdkafka - // For safety reasons, we only use it inside of this closure - } - /// Check `topicHandles` for a handle matching the topic name and create a new handle if needed. /// - Parameter topic: The name of the topic that is addressed. private func createTopicHandleIfNeeded(topic: String) throws -> OpaquePointer? { diff --git a/Sources/SwiftKafka/RDKafka/RDKafka.swift b/Sources/SwiftKafka/RDKafka/RDKafka.swift index ad1e6f0a..6c189c33 100644 --- a/Sources/SwiftKafka/RDKafka/RDKafka.swift +++ b/Sources/SwiftKafka/RDKafka/RDKafka.swift @@ -27,7 +27,7 @@ struct RDKafka { static func createClient( type: ClientType, configDictionary: [String: String], - callback: ((UnsafePointer?) -> Void)? = nil, + callback: ((RDKafkaConfig.KafkaAcknowledgementResult?) -> Void)? = nil, logger: Logger ) throws -> KafkaClient { let clientType = type == .producer ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift b/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift index 14c519bd..6fd20580 100644 --- a/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift +++ b/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift @@ -16,9 +16,10 @@ import Crdkafka /// A collection of helper functions wrapping common `rd_kafka_conf_*` functions in Swift. struct RDKafkaConfig { + typealias KafkaAcknowledgementResult = Result /// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`. final class CapturedClosure { - typealias Closure = (UnsafePointer?) -> Void + typealias Closure = (KafkaAcknowledgementResult?) -> Void let closure: Closure init(_ closure: @escaping Closure) { @@ -69,7 +70,7 @@ struct RDKafkaConfig { /// - Returns: A ``CapturedClosure`` object that must me retained by the caller as long as acknowledgements are received. static func setDeliveryReportCallback( configPointer: OpaquePointer, - _ callback: @escaping ((UnsafePointer?) -> Void) + _ callback: @escaping ((KafkaAcknowledgementResult?) -> Void) ) -> CapturedClosure { let capturedClosure = CapturedClosure(callback) // Pass the captured closure to the C closure as an opaque object @@ -83,14 +84,17 @@ struct RDKafkaConfig { let callbackWrapper: ( @convention(c) (OpaquePointer?, UnsafePointer?, UnsafeMutableRawPointer?) -> Void ) = { _, messagePointer, opaquePointer in - guard let opaquePointer = opaquePointer else { fatalError("Could not resolve reference to KafkaProducer instance") } let opaque = Unmanaged.fromOpaque(opaquePointer).takeUnretainedValue() let actualCallback = opaque.closure - actualCallback(messagePointer) + let messageResult = Self.convertMessageToAcknowledgementResult(messagePointer: messagePointer) + actualCallback(messageResult) + + // The messagePointer is automatically destroyed by librdkafka + // For safety reasons, we only use it inside of this callback } rd_kafka_conf_set_dr_msg_cb( @@ -100,4 +104,30 @@ struct RDKafkaConfig { return capturedClosure } + + /// Convert an unsafe`rd_kafka_message_t` object to a safe ``KafkaAcknowledgementResult``. + /// - Parameter messagePointer: An `UnsafePointer` pointing to the `rd_kafka_message_t` object in memory. + /// - Returns: A ``KafkaAcknowledgementResult``. + private static func convertMessageToAcknowledgementResult( + messagePointer: UnsafePointer? + ) -> KafkaAcknowledgementResult? { + guard let messagePointer else { + return nil + } + + let messageID = UInt(bitPattern: messagePointer.pointee._private) + + let messageResult: KafkaAcknowledgementResult + do { + let message = try KafkaAcknowledgedMessage(messagePointer: messagePointer, id: messageID) + messageResult = .success(message) + } catch { + guard let error = error as? KafkaAcknowledgedMessageError else { + fatalError("Caught error that is not of type \(KafkaAcknowledgedMessageError.self)") + } + messageResult = .failure(error) + } + + return messageResult + } } diff --git a/Tests/IntegrationTests/SwiftKafkaTests.swift b/Tests/IntegrationTests/SwiftKafkaTests.swift index d15eea76..1a84e7dc 100644 --- a/Tests/IntegrationTests/SwiftKafkaTests.swift +++ b/Tests/IntegrationTests/SwiftKafkaTests.swift @@ -73,143 +73,189 @@ final class SwiftKafkaTests: XCTestCase { } func testProduceAndConsumeWithConsumerGroup() async throws { - let producer = try await KafkaProducer(config: producerConfig, logger: .kafkaTest) + let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) + let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) - let consumerConfig = KafkaConsumerConfig( - consumptionStrategy: .group(groupID: "subscription-test-group-id", topics: [uniqueTestTopic]), - autoOffsetReset: .beginning, // Always read topics from beginning - bootstrapServers: [self.bootstrapServer], - brokerAddressFamily: .v4 - ) - - let consumer = try KafkaConsumer( - config: consumerConfig, - logger: .kafkaTest - ) - - let testMessages = Self.creataTestMessages(topic: self.uniqueTestTopic, count: 10) - try await Self.sendAndAcknowledgeMessages(producer: producer, messages: testMessages) - - var consumedMessages = [KafkaConsumerMessage]() - for await messageResult in consumer.messages { - guard case .success(let message) = messageResult else { - continue + await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await producer.run() } - consumedMessages.append(message) - if consumedMessages.count >= testMessages.count { - break + // Producer Task + group.addTask { + try await Self.sendAndAcknowledgeMessages( + producer: producer, + acknowledgements: acks, + messages: testMessages + ) + await producer.shutdownGracefully() } - } - - XCTAssertEqual(testMessages.count, consumedMessages.count) - for (index, consumedMessage) in consumedMessages.enumerated() { - XCTAssertEqual(testMessages[index].topic, consumedMessage.topic) - XCTAssertEqual(testMessages[index].key, consumedMessage.key) - XCTAssertEqual(testMessages[index].value, consumedMessage.value) + // Consumer Task + group.addTask { + let consumerConfig = KafkaConsumerConfig( + consumptionStrategy: .group(groupID: "subscription-test-group-id", topics: [self.uniqueTestTopic]), + autoOffsetReset: .beginning, // Always read topics from beginning + bootstrapServers: [self.bootstrapServer], + brokerAddressFamily: .v4 + ) + + let consumer = try KafkaConsumer( + config: consumerConfig, + logger: .kafkaTest + ) + + var consumedMessages = [KafkaConsumerMessage]() + for await messageResult in consumer.messages { + guard case .success(let message) = messageResult else { + continue + } + consumedMessages.append(message) + + if consumedMessages.count >= testMessages.count { + break + } + } + + XCTAssertEqual(testMessages.count, consumedMessages.count) + + for (index, consumedMessage) in consumedMessages.enumerated() { + XCTAssertEqual(testMessages[index].topic, consumedMessage.topic) + XCTAssertEqual(testMessages[index].key, consumedMessage.key) + XCTAssertEqual(testMessages[index].value, consumedMessage.value) + } + } } - - await producer.shutdownGracefully() } func testProduceAndConsumeWithAssignedTopicPartition() async throws { - let producer = try await KafkaProducer(config: producerConfig, logger: .kafkaTest) - - let consumerConfig = KafkaConsumerConfig( - consumptionStrategy: .partition( - topic: uniqueTestTopic, - partition: KafkaPartition(rawValue: 0), - offset: 0 - ), - autoOffsetReset: .beginning, // Always read topics from beginning - bootstrapServers: [self.bootstrapServer], - brokerAddressFamily: .v4 - ) + let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) + let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) - let consumer = try KafkaConsumer( - config: consumerConfig, - logger: .kafkaTest - ) - - let testMessages = Self.creataTestMessages(topic: self.uniqueTestTopic, count: 10) - try await Self.sendAndAcknowledgeMessages(producer: producer, messages: testMessages) - - var consumedMessages = [KafkaConsumerMessage]() - for await messageResult in consumer.messages { - guard case .success(let message) = messageResult else { - continue + await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await producer.run() } - consumedMessages.append(message) - if consumedMessages.count >= testMessages.count { - break + // Producer Task + group.addTask { + try await Self.sendAndAcknowledgeMessages( + producer: producer, + acknowledgements: acks, + messages: testMessages + ) + await producer.shutdownGracefully() } - } - - XCTAssertEqual(testMessages.count, consumedMessages.count) - for (index, consumedMessage) in consumedMessages.enumerated() { - XCTAssertEqual(testMessages[index].topic, consumedMessage.topic) - XCTAssertEqual(testMessages[index].key, consumedMessage.key) - XCTAssertEqual(testMessages[index].value, consumedMessage.value) + // Consumer Task + group.addTask { + let consumerConfig = KafkaConsumerConfig( + consumptionStrategy: .partition( + topic: self.uniqueTestTopic, + partition: KafkaPartition(rawValue: 0), + offset: 0 + ), + autoOffsetReset: .beginning, // Always read topics from beginning + bootstrapServers: [self.bootstrapServer], + brokerAddressFamily: .v4 + ) + + let consumer = try KafkaConsumer( + config: consumerConfig, + logger: .kafkaTest + ) + + var consumedMessages = [KafkaConsumerMessage]() + for await messageResult in consumer.messages { + guard case .success(let message) = messageResult else { + continue + } + consumedMessages.append(message) + + if consumedMessages.count >= testMessages.count { + break + } + } + + XCTAssertEqual(testMessages.count, consumedMessages.count) + + for (index, consumedMessage) in consumedMessages.enumerated() { + XCTAssertEqual(testMessages[index].topic, consumedMessage.topic) + XCTAssertEqual(testMessages[index].key, consumedMessage.key) + XCTAssertEqual(testMessages[index].value, consumedMessage.value) + } + } } - - await producer.shutdownGracefully() } func testProduceAndConsumeWithCommitSync() async throws { - let producer = try await KafkaProducer(config: producerConfig, logger: .kafkaTest) - - let consumerConfig = KafkaConsumerConfig( - consumptionStrategy: .group(groupID: "commit-sync-test-group-id", topics: [uniqueTestTopic]), - enableAutoCommit: false, - autoOffsetReset: .beginning, // Always read topics from beginning - bootstrapServers: [self.bootstrapServer], - brokerAddressFamily: .v4 - ) - - let consumer = try KafkaConsumer( - config: consumerConfig, - logger: .kafkaTest - ) - - let testMessages = Self.creataTestMessages(topic: self.uniqueTestTopic, count: 10) - try await Self.sendAndAcknowledgeMessages(producer: producer, messages: testMessages) + let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) + let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) - var consumedMessages = [KafkaConsumerMessage]() - for await messageResult in consumer.messages { - guard case .success(let message) = messageResult else { - continue + await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await producer.run() } - consumedMessages.append(message) - try await consumer.commitSync(message) - if consumedMessages.count >= testMessages.count { - break + // Producer Task + group.addTask { + try await Self.sendAndAcknowledgeMessages( + producer: producer, + acknowledgements: acks, + messages: testMessages + ) + await producer.shutdownGracefully() } - } - - XCTAssertEqual(testMessages.count, consumedMessages.count) - await producer.shutdownGracefully() - - // Additionally test that commit does not work on closed consumer - do { - guard let consumedMessage = consumedMessages.first else { - XCTFail("No messages consumed") - return + // Consumer Task + group.addTask { + let consumerConfig = KafkaConsumerConfig( + consumptionStrategy: .group(groupID: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]), + enableAutoCommit: false, + autoOffsetReset: .beginning, // Always read topics from beginning + bootstrapServers: [self.bootstrapServer], + brokerAddressFamily: .v4 + ) + + let consumer = try KafkaConsumer( + config: consumerConfig, + logger: .kafkaTest + ) + + var consumedMessages = [KafkaConsumerMessage]() + for await messageResult in consumer.messages { + guard case .success(let message) = messageResult else { + continue + } + consumedMessages.append(message) + try await consumer.commitSync(message) + + if consumedMessages.count >= testMessages.count { + break + } + } + + XCTAssertEqual(testMessages.count, consumedMessages.count) + + // Additionally test that commit does not work on closed consumer + do { + guard let consumedMessage = consumedMessages.first else { + XCTFail("No messages consumed") + return + } + try await consumer.commitSync(consumedMessage) + XCTFail("Invoking commitSync on closed consumer should have failed") + } catch {} } - try await consumer.commitSync(consumedMessage) - XCTFail("Invoking commitSync on closed consumer should have failed") - } catch {} + } } - // TODO: also test concurrently? - // MARK: - Helpers - private static func creataTestMessages(topic: String, count: UInt) -> [KafkaProducerMessage] { + private static func createTestMessages(topic: String, count: UInt) -> [KafkaProducerMessage] { return Array(0..() @@ -231,7 +278,7 @@ final class SwiftKafkaTests: XCTestCase { var acknowledgedMessages = Set() - for await messageResult in producer.acknowledgements { + for await messageResult in acknowledgements { guard case .success(let acknowledgedMessage) = messageResult else { XCTFail() return diff --git a/Tests/SwiftKafkaTests/KafkaProducerTests.swift b/Tests/SwiftKafkaTests/KafkaProducerTests.swift index ce061989..507fc107 100644 --- a/Tests/SwiftKafkaTests/KafkaProducerTests.swift +++ b/Tests/SwiftKafkaTests/KafkaProducerTests.swift @@ -52,129 +52,182 @@ final class KafkaProducerTests: XCTestCase { } func testSendAsync() async throws { - let producer = try await KafkaProducer(config: config, logger: .kafkaTest) + let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) - let expectedTopic = "test-topic" - let message = KafkaProducerMessage( - topic: expectedTopic, - key: "key", - value: "Hello, World!" - ) - - let messageID = try await producer.sendAsync(message) + await withThrowingTaskGroup(of: Void.self) { group in - for await messageResult in producer.acknowledgements { - guard case .success(let acknowledgedMessage) = messageResult else { - XCTFail() - return + // Run Task + group.addTask { + try await producer.run() } - XCTAssertEqual(messageID, acknowledgedMessage.id) - XCTAssertEqual(expectedTopic, acknowledgedMessage.topic) - XCTAssertEqual(message.key, acknowledgedMessage.key) - XCTAssertEqual(message.value, acknowledgedMessage.value) - break + // Test Task + group.addTask { + let expectedTopic = "test-topic" + let message = KafkaProducerMessage( + topic: expectedTopic, + key: "key", + value: "Hello, World!" + ) + + let messageID = try await producer.sendAsync(message) + + for await messageResult in acks { + guard case .success(let acknowledgedMessage) = messageResult else { + XCTFail() + return + } + + XCTAssertEqual(messageID, acknowledgedMessage.id) + XCTAssertEqual(expectedTopic, acknowledgedMessage.topic) + XCTAssertEqual(message.key, acknowledgedMessage.key) + XCTAssertEqual(message.value, acknowledgedMessage.value) + break + } + + await producer.shutdownGracefully() + } } - - await producer.shutdownGracefully() } func testSendAsyncEmptyMessage() async throws { - let producer = try await KafkaProducer(config: config, logger: .kafkaTest) - - let expectedTopic = "test-topic" - let message = KafkaProducerMessage( - topic: expectedTopic, - value: ByteBuffer() - ) + let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) - let messageID = try await producer.sendAsync(message) + await withThrowingTaskGroup(of: Void.self) { group in - for await messageResult in producer.acknowledgements { - guard case .success(let acknowledgedMessage) = messageResult else { - XCTFail() - return + // Run Task + group.addTask { + try await producer.run() } - XCTAssertEqual(messageID, acknowledgedMessage.id) - XCTAssertEqual(expectedTopic, acknowledgedMessage.topic) - XCTAssertEqual(message.key, acknowledgedMessage.key) - XCTAssertEqual(message.value, acknowledgedMessage.value) - break + // Test Task + group.addTask { + let expectedTopic = "test-topic" + let message = KafkaProducerMessage( + topic: expectedTopic, + value: ByteBuffer() + ) + + let messageID = try await producer.sendAsync(message) + + for await messageResult in acks { + guard case .success(let acknowledgedMessage) = messageResult else { + XCTFail() + return + } + + XCTAssertEqual(messageID, acknowledgedMessage.id) + XCTAssertEqual(expectedTopic, acknowledgedMessage.topic) + XCTAssertEqual(message.key, acknowledgedMessage.key) + XCTAssertEqual(message.value, acknowledgedMessage.value) + break + } + + await producer.shutdownGracefully() + } } - - await producer.shutdownGracefully() } func testSendAsyncTwoTopics() async throws { - let producer = try await KafkaProducer(config: config, logger: .kafkaTest) - - let message1 = KafkaProducerMessage( - topic: "test-topic1", - key: "key1", - value: "Hello, Munich!" - ) - let message2 = KafkaProducerMessage( - topic: "test-topic2", - key: "key2", - value: "Hello, London!" - ) + let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) + await withThrowingTaskGroup(of: Void.self) { group in - var messageIDs = Set() - - messageIDs.insert(try await producer.sendAsync(message1)) - messageIDs.insert(try await producer.sendAsync(message2)) - - var acknowledgedMessages = Set() - - for await messageResult in producer.acknowledgements { - guard case .success(let acknowledgedMessage) = messageResult else { - XCTFail() - return + // Run Task + group.addTask { + try await producer.run() } - acknowledgedMessages.insert(acknowledgedMessage) - - if acknowledgedMessages.count >= 2 { - break + // Test Task + group.addTask { + let message1 = KafkaProducerMessage( + topic: "test-topic1", + key: "key1", + value: "Hello, Munich!" + ) + let message2 = KafkaProducerMessage( + topic: "test-topic2", + key: "key2", + value: "Hello, London!" + ) + + var messageIDs = Set() + + messageIDs.insert(try await producer.sendAsync(message1)) + messageIDs.insert(try await producer.sendAsync(message2)) + + var acknowledgedMessages = Set() + + for await messageResult in acks { + guard case .success(let acknowledgedMessage) = messageResult else { + XCTFail() + return + } + + acknowledgedMessages.insert(acknowledgedMessage) + + if acknowledgedMessages.count >= 2 { + break + } + } + + XCTAssertEqual(2, acknowledgedMessages.count) + XCTAssertEqual(acknowledgedMessages.map(\.id).sorted(), messageIDs.sorted()) + XCTAssertTrue(acknowledgedMessages.contains(where: { $0.topic == message1.topic })) + XCTAssertTrue(acknowledgedMessages.contains(where: { $0.topic == message2.topic })) + XCTAssertTrue(acknowledgedMessages.contains(where: { $0.key == message1.key })) + XCTAssertTrue(acknowledgedMessages.contains(where: { $0.key == message2.key })) + XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message1.value })) + XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message2.value })) + + await producer.shutdownGracefully() } } - - XCTAssertEqual(2, acknowledgedMessages.count) - XCTAssertEqual(acknowledgedMessages.map(\.id).sorted(), messageIDs.sorted()) - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.topic == message1.topic })) - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.topic == message2.topic })) - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.key == message1.key })) - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.key == message2.key })) - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message1.value })) - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message2.value })) - - await producer.shutdownGracefully() } func testProducerNotUsableAfterShutdown() async throws { - let producer = try await KafkaProducer(config: config, logger: .kafkaTest) + let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) await producer.shutdownGracefully() - let message = KafkaProducerMessage( - topic: "test-topic", - value: "Hello, World!" - ) + await withThrowingTaskGroup(of: Void.self) { group in + + // Run Task + group.addTask { + try await producer.run() + } - do { - try await producer.sendAsync(message) - XCTFail("Method should have thrown error") - } catch {} + // Test Task + group.addTask { + let message = KafkaProducerMessage( + topic: "test-topic", + value: "Hello, World!" + ) + + do { + try await producer.sendAsync(message) + XCTFail("Method should have thrown error") + } catch {} + + // This subscribes to the acknowledgements stream and immediately terminates the stream. + // Required to kill the run task. + var iterator: KafkaMessageAcknowledgements.AsyncIterator? = acks.makeAsyncIterator() + _ = iterator + iterator = nil + } + } } func testNoMemoryLeakAfterShutdown() async throws { var producer: KafkaProducer? - producer = try await KafkaProducer(config: self.config, logger: .kafkaTest) + var acks: KafkaMessageAcknowledgements? + (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) + _ = acks weak var producerCopy = producer await producer?.shutdownGracefully() producer = nil + // Make sure to terminate the AsyncSequence + acks = nil // Wait for rd_kafka_flush to complete try await Task.sleep(nanoseconds: 10 * 1_000_000_000)