diff --git a/README.md b/README.md index c559db92..99169719 100644 --- a/README.md +++ b/README.md @@ -25,12 +25,46 @@ Finally, add `import SwiftKafka` to your source code. ### Producer API +After creating the `KafkaProducer`, messages can be sent to a `topic` using the `send(_:)` method. + +```swift +let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"]) + +let producer = try KafkaProducer.makeProducer( + config: config, + logger: .kafkaTest // Your logger here +) + +await withThrowingTaskGroup(of: Void.self) { group in + + // Run Task + group.addTask { + try await producer.run() + } + + // Task sending messages + group.addTask { + try producer.send( + KafkaProducerMessage( + topic: "topic-name", + value: "Hello, World!" + ) + ) + + // Required + await producer.shutdownGracefully() + } +} +``` + +#### Acknowledgements + The `send(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `acknowledgements` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error. ```swift let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"]) -let (producer, acknowledgements) = try await KafkaProducer.makeProducerWithAcknowledgements( +let (producer, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements( config: config, logger: .kafkaTest // Your logger here ) @@ -42,9 +76,9 @@ await withThrowingTaskGroup(of: Void.self) { group in try await producer.run() } - // Task receiving acknowledgements + // Task sending messages and receiving acknowledgements group.addTask { - let messageID = try await producer.send( + let messageID = try producer.send( KafkaProducerMessage( topic: "topic-name", value: "Hello, World!" @@ -55,8 +89,7 @@ await withThrowingTaskGroup(of: Void.self) { group in // Check if acknowledgement belongs to the sent message } - // Required - await producer.shutdownGracefully() + // The producer shuts down automatically after consuming the acknowledgements } } ``` diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift index 88bdc7d2..3c322a15 100644 --- a/Sources/SwiftKafka/KafkaProducer.swift +++ b/Sources/SwiftKafka/KafkaProducer.swift @@ -14,17 +14,56 @@ import Crdkafka import Logging +import NIOConcurrencyHelpers import NIOCore +/// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true. +struct NoBackPressure: NIOAsyncSequenceProducerBackPressureStrategy { + func didYield(bufferDepth: Int) -> Bool { true } + func didConsume(bufferDepth: Int) -> Bool { true } +} + +// MARK: - ShutDownOnTerminate + +/// `NIOAsyncSequenceProducerDelegate` that terminates the shuts the producer down when +/// `didTerminate()` is invoked. +struct ShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock + let stateMachine: NIOLockedValueBox +} + +extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate { + func produceMore() { + // No back pressure + return + } + + func didTerminate() { + let action = self.stateMachine.withLockedValue { $0.finish() } + switch action { + case .shutdownGracefullyAndFinishSource(let client, let source, let topicHandles): + Task { + await KafkaProducer._shutDownGracefully( + client: client, + topicHandles: topicHandles, + source: source, + timeout: 10000 + ) + } + case .none: + return + } + } +} + /// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``). public struct KafkaMessageAcknowledgements: AsyncSequence { public typealias Element = Result - typealias WrappedSequence = AsyncStream + typealias WrappedSequence = NIOAsyncSequenceProducer let wrappedSequence: WrappedSequence /// `AsynceIteratorProtocol` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``). public struct AcknowledgedMessagesAsyncIterator: AsyncIteratorProtocol { - var wrappedIterator: AsyncStream.AsyncIterator + var wrappedIterator: WrappedSequence.AsyncIterator public mutating func next() async -> Element? { await self.wrappedIterator.next() @@ -40,33 +79,18 @@ public struct KafkaMessageAcknowledgements: AsyncSequence { /// Please make sure to explicitly call ``shutdownGracefully(timeout:)`` when the ``KafkaProducer`` is not used anymore. /// - Note: When messages get published to a non-existent topic, a new topic is created using the ``KafkaTopicConfiguration`` /// configuration object (only works if server has `auto.create.topics.enable` property set). -public actor KafkaProducer { - /// States that the ``KafkaProducer`` can have. - private enum State { - /// The ``KafkaProducer`` has started and is ready to use. - case started - /// ``KafkaProducer/shutdownGracefully()`` has been invoked and the ``KafkaProducer`` - /// is in the process of receiving all outstanding acknowlegements and shutting down. - case shuttingDown - /// The ``KafkaProducer`` has been shut down and cannot be used anymore. - case shutDown - } +public final class KafkaProducer { + typealias Producer = NIOAsyncSequenceProducer< + Result, + NoBackPressure, + ShutdownOnTerminate + > /// State of the ``KafkaProducer``. - private var state: State + private let stateMachine: NIOLockedValueBox - /// Counter that is used to assign each message a unique ID. - /// Every time a new message is sent to the Kafka cluster, the counter is increased by one. - private var messageIDCounter: UInt = 0 - /// The ``TopicConfiguration`` used for newly created topics. + /// Topic configuration that is used when a new topic has to be created by the producer. private let topicConfig: KafkaTopicConfiguration - /// A logger. - private let logger: Logger - /// Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer. - private var topicHandles: [String: OpaquePointer] - - /// Used for handling the connection to the Kafka cluster. - private let client: KafkaClient // Private initializer, use factory methods to create KafkaProducer /// Initialize a new ``KafkaProducer``. @@ -75,15 +99,11 @@ public actor KafkaProducer { /// - Parameter logger: A logger. /// - Throws: A ``KafkaError`` if initializing the producer failed. private init( - client: KafkaClient, - topicConfig: KafkaTopicConfiguration, - logger: Logger - ) async throws { - self.client = client + stateMachine: NIOLockedValueBox, + topicConfig: KafkaTopicConfiguration + ) throws { + self.stateMachine = stateMachine self.topicConfig = topicConfig - self.topicHandles = [:] - self.logger = logger - self.state = .started } /// Initialize a new ``KafkaProducer``. @@ -91,7 +111,7 @@ public actor KafkaProducer { /// This factory method creates a producer without message acknowledgements. /// /// - Parameter configuration: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``. - /// - Parameter topicConfiguration: The ``KafkaTopicConfiguration`` used for newly created topics. + /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics. /// - Parameter logger: A logger. /// - Returns: The newly created ``KafkaProducer``. /// - Throws: A ``KafkaError`` if initializing the producer failed. @@ -99,7 +119,9 @@ public actor KafkaProducer { config: KafkaProducerConfiguration = KafkaProducerConfiguration(), topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(), logger: Logger - ) async throws -> KafkaProducer { + ) throws -> KafkaProducer { + let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) + let client = try RDKafka.createClient( type: .producer, configDictionary: config.dictionary, @@ -109,12 +131,18 @@ public actor KafkaProducer { logger: logger ) - let producer = try await KafkaProducer( - client: client, - topicConfig: topicConfig, - logger: logger + let producer = try KafkaProducer( + stateMachine: stateMachine, + topicConfig: topicConfig ) + stateMachine.withLockedValue { + $0.initialize( + client: client, + source: nil + ) + } + return producer } @@ -134,40 +162,44 @@ public actor KafkaProducer { config: KafkaProducerConfiguration = KafkaProducerConfiguration(), topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(), logger: Logger - ) async throws -> (KafkaProducer, KafkaMessageAcknowledgements) { - var streamContinuation: AsyncStream>.Continuation? - let stream = AsyncStream { continuation in - streamContinuation = continuation - } + ) throws -> (KafkaProducer, KafkaMessageAcknowledgements) { + let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) + + let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( + elementType: Result.self, + backPressureStrategy: NoBackPressure(), + delegate: ShutdownOnTerminate(stateMachine: stateMachine) + ) + let source = sourceAndSequence.source let client = try RDKafka.createClient( type: .producer, configDictionary: config.dictionary, - callback: { [logger, streamContinuation] messageResult in + callback: { [logger, source] messageResult in guard let messageResult else { logger.error("Could not resolve acknowledged message") return } // Ignore YieldResult as we don't support back pressure in KafkaProducer - streamContinuation?.yield(messageResult) + _ = source.yield(messageResult) }, logger: logger ) - let producer = try await KafkaProducer( - client: client, - topicConfig: topicConfig, - logger: logger + let producer = try KafkaProducer( + stateMachine: stateMachine, + topicConfig: topicConfig ) - streamContinuation?.onTermination = { [producer] _ in - Task { - await producer.shutdownGracefully() - } + stateMachine.withLockedValue { + $0.initialize( + client: client, + source: source + ) } - let acknowlegementsSequence = KafkaMessageAcknowledgements(wrappedSequence: stream) + let acknowlegementsSequence = KafkaMessageAcknowledgements(wrappedSequence: sourceAndSequence.sequence) return (producer, acknowlegementsSequence) } @@ -177,29 +209,40 @@ public actor KafkaProducer { /// Afterwards, it shuts down the connection to Kafka and cleans any remaining state up. /// - Parameter timeout: Maximum amount of milliseconds this method waits for any outstanding messages to be sent. public func shutdownGracefully(timeout: Int32 = 10000) async { - switch self.state { - case .started: - self.state = .shuttingDown - await self._shutDownGracefully(timeout: timeout) - case .shuttingDown, .shutDown: + let action = self.stateMachine.withLockedValue { $0.finish() } + switch action { + case .shutdownGracefullyAndFinishSource(let client, let source, let topicHandles): + await KafkaProducer._shutDownGracefully( + client: client, + topicHandles: topicHandles, + source: source, + timeout: timeout + ) + case .none: return } } - private func _shutDownGracefully(timeout: Int32) async { + // Static so we perform this without needing a reference to `KafkaProducer` + static func _shutDownGracefully( + client: KafkaClient, + topicHandles: [String: OpaquePointer], + source: Producer.Source?, + timeout: Int32 + ) async { + source?.finish() + await withCheckedContinuation { (continuation: CheckedContinuation) in // Wait `timeout` seconds for outstanding messages to be sent and callbacks to be called - self.client.withKafkaHandlePointer { handle in + client.withKafkaHandlePointer { handle in rd_kafka_flush(handle, timeout) continuation.resume() } } - for (_, topicHandle) in self.topicHandles { + for (_, topicHandle) in topicHandles { rd_kafka_topic_destroy(topicHandle) } - - self.state = .shutDown } /// Start polling Kafka for acknowledged messages. @@ -208,9 +251,15 @@ public actor KafkaProducer { /// - Returns: An awaitable task representing the execution of the poll loop. public func run(pollInterval: Duration = .milliseconds(100)) async throws { // TODO(felix): make pollInterval part of config -> easier to adapt to Service protocol (service-lifecycle) - while self.state == .started { - self.client.poll(timeout: 0) - try await Task.sleep(for: pollInterval) + while !Task.isCancelled { + let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() } + switch nextAction { + case .poll(let client): + client.poll(timeout: 0) + try await Task.sleep(for: pollInterval) + case .killPollLoop: + return + } } } @@ -223,16 +272,24 @@ public actor KafkaProducer { /// - Throws: A ``KafkaError`` if sending the message failed. @discardableResult public func send(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID { - switch self.state { - case .started: - return try self._send(message) - case .shuttingDown, .shutDown: - throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer") + let action = try self.stateMachine.withLockedValue { try $0.send() } + switch action { + case .send(let client, let newMessageID): + try self._send( + client: client, + message: message, + newMessageID: newMessageID + ) + return KafkaProducerMessageID(rawValue: newMessageID) } } - private func _send(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID { - let topicHandle = try self.createTopicHandleIfNeeded(topic: message.topic) + private func _send( + client: KafkaClient, + message: KafkaProducerMessage, + newMessageID: UInt + ) throws { + let topicHandle = try self._createTopicHandleIfNeeded(topic: message.topic) let keyBytes: [UInt8]? if var key = message.key { @@ -241,8 +298,6 @@ public actor KafkaProducer { keyBytes = nil } - self.messageIDCounter += 1 - let responseCode = message.value.withUnsafeReadableBytes { valueBuffer in // Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster. @@ -255,36 +310,230 @@ public actor KafkaProducer { valueBuffer.count, keyBytes, keyBytes?.count ?? 0, - UnsafeMutableRawPointer(bitPattern: self.messageIDCounter) + UnsafeMutableRawPointer(bitPattern: newMessageID) ) } guard responseCode == 0 else { throw KafkaError.rdKafkaError(wrapping: rd_kafka_last_error()) } - - return KafkaProducerMessageID(rawValue: self.messageIDCounter) } /// Check `topicHandles` for a handle matching the topic name and create a new handle if needed. /// - Parameter topic: The name of the topic that is addressed. - private func createTopicHandleIfNeeded(topic: String) throws -> OpaquePointer? { - if let handle = self.topicHandles[topic] { - return handle - } else { - let newHandle = try self.client.withKafkaHandlePointer { handle in - let rdTopicConf = try RDKafkaTopicConfig.createFrom(topicConfig: self.topicConfig) - return rd_kafka_topic_new( - handle, - topic, - rdTopicConf + private func _createTopicHandleIfNeeded(topic: String) throws -> OpaquePointer? { + try self.stateMachine.withLockedValue { state in + let action = try state.createTopicHandleIfNeeded(topic: topic) + switch action { + case .handleExists(let handle): + return handle + case .createTopicHandle(let client, let topic): + let newHandle = try client.withKafkaHandlePointer { handle in + let rdTopicConf = try RDKafkaTopicConfig.createFrom(topicConfig: self.topicConfig) + return rd_kafka_topic_new( + handle, + topic, + rdTopicConf + ) + // rd_kafka_topic_new deallocates topic config object + } + if let newHandle { + try state.addTopicHandle(topic: topic, handle: newHandle) + } + return newHandle + } + } + } +} + +// MARK: - KafkaProducer + StateMachine + +extension KafkaProducer { + /// State machine representing the state of the ``KafkaProducer``. + struct StateMachine { + /// A logger. + let logger: Logger + + /// The state of the ``StateMachine``. + enum State { + /// The state machine has been initialized with init() but is not yet Initialized + /// using `func initialize()` (required). + case uninitialized + /// The ``KafkaProducer`` has started and is ready to use. + /// + /// - Parameter messageIDCounter:Used to incrementally assign unique IDs to messages. + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + /// - Parameter topicHandles: Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer. + case started( + client: KafkaClient, + messageIDCounter: UInt, + source: Producer.Source?, + topicHandles: [String: OpaquePointer] + ) + /// The ``KafkaProducer`` has been shut down and cannot be used anymore. + case finished + } + + /// The current state of the StateMachine. + var state: State = .uninitialized + + /// Delayed initialization of `StateMachine` as the `source` is not yet available + /// when the normal initialization occurs. + mutating func initialize( + client: KafkaClient, + source: Producer.Source? + ) { + guard case .uninitialized = self.state else { + fatalError("\(#function) can only be invoked in state .uninitialized, but was invoked in state \(self.state)") + } + self.state = .started( + client: client, + messageIDCounter: 0, + source: source, + topicHandles: [:] + ) + } + + /// Action to be taken when wanting to poll. + enum PollLoopAction { + /// Poll client for new consumer messages. + case poll(client: KafkaClient) + /// Kill the poll loop. + case killPollLoop + } + + /// Returns the next action to be taken when wanting to poll. + /// - Returns: The next action to be taken, either polling or killing the poll loop. + /// + /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. + func nextPollLoopAction() -> PollLoopAction { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .started(let client, _, _, _): + return .poll(client: client) + case .finished: + return .killPollLoop + } + } + + /// Action to take when wanting to create a topic handle. + enum CreateTopicHandleAction { + /// Do create a new topic handle. + case createTopicHandle( + client: KafkaClient, + topic: String + ) + /// No need to create a new handle. It exists already: `handle`. + case handleExists(handle: OpaquePointer) + } + + /// Returns action to be taken when wanting to create a new topic handle. + func createTopicHandleIfNeeded(topic: String) throws -> CreateTopicHandleAction { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .started(let client, _, _, let topicHandles): + if let handle = topicHandles[topic] { + return .handleExists(handle: handle) + } else { + return .createTopicHandle(client: client, topic: topic) + } + case .finished: + throw KafkaError.connectionClosed(reason: "Tried to create topic handle on closed connection") + } + } + + /// Add a newly created topic handle to the list of topic handles contained in the state machine. + mutating func addTopicHandle( + topic: String, + handle: OpaquePointer + ) throws { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .started(let client, let messageIDCounter, let source, let topicHandles): + var topicHandles = topicHandles + topicHandles[topic] = handle + self.state = .started( + client: client, + messageIDCounter: messageIDCounter, + source: source, + topicHandles: topicHandles + ) + case .finished: + throw KafkaError.connectionClosed(reason: "Tried to create topic handle on closed connection") + } + } + + /// Action to be taken when wanting to send a message. + enum SendAction { + /// Send the message. + /// + /// - Important: `newMessageID` is the new message ID assigned to the message to be sent. + case send( + client: KafkaClient, + newMessageID: UInt + ) + } + + /// Get action to be taken when wanting to send a message. + /// + /// - Returns: The action to be taken. + mutating func send() throws -> SendAction { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .started(let client, let messageIDCounter, let source, let topicHandles): + let newMessageID = messageIDCounter + 1 + self.state = .started( + client: client, + messageIDCounter: newMessageID, + source: source, + topicHandles: topicHandles ) - // rd_kafka_topic_new deallocates topic config object + return .send( + client: client, + newMessageID: newMessageID + ) + case .finished: + throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer") } - if newHandle != nil { - self.topicHandles[topic] = newHandle + } + + /// Action to be taken when wanting to do close the producer. + enum FinishAction { + /// Shut down the ``KafkaProducer`` and finish the given `source` object. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + /// - Parameter topicHandles: Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer. + case shutdownGracefullyAndFinishSource( + client: KafkaClient, + source: Producer.Source?, + topicHandles: [String: OpaquePointer] + ) + } + + /// Get action to be taken when wanting to do close the producer. + /// - Returns: The action to be taken, or `nil` if there is no action to be taken. + /// + /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. + mutating func finish() -> FinishAction? { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .started(let client, _, let source, let topicHandles): + self.state = .finished + return .shutdownGracefullyAndFinishSource( + client: client, + source: source, + topicHandles: topicHandles + ) + case .finished: + return nil } - return newHandle } } } diff --git a/Tests/IntegrationTests/SwiftKafkaTests.swift b/Tests/IntegrationTests/SwiftKafkaTests.swift index 418b76ff..31cc9b1f 100644 --- a/Tests/IntegrationTests/SwiftKafkaTests.swift +++ b/Tests/IntegrationTests/SwiftKafkaTests.swift @@ -74,7 +74,7 @@ final class SwiftKafkaTests: XCTestCase { func testProduceAndConsumeWithConsumerGroup() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) - let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) await withThrowingTaskGroup(of: Void.self) { group in // Run Task @@ -131,7 +131,7 @@ final class SwiftKafkaTests: XCTestCase { func testProduceAndConsumeWithAssignedTopicPartition() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) - let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) await withThrowingTaskGroup(of: Void.self) { group in // Run Task @@ -192,7 +192,7 @@ final class SwiftKafkaTests: XCTestCase { func testProduceAndConsumeWithCommitSync() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) - let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) await withThrowingTaskGroup(of: Void.self) { group in // Run Task @@ -273,7 +273,7 @@ final class SwiftKafkaTests: XCTestCase { var messageIDs = Set() for message in messages { - messageIDs.insert(try await producer.send(message)) + messageIDs.insert(try producer.send(message)) } var acknowledgedMessages = Set() diff --git a/Tests/SwiftKafkaTests/KafkaProducerTests.swift b/Tests/SwiftKafkaTests/KafkaProducerTests.swift index 666da3c4..6f98588d 100644 --- a/Tests/SwiftKafkaTests/KafkaProducerTests.swift +++ b/Tests/SwiftKafkaTests/KafkaProducerTests.swift @@ -52,7 +52,7 @@ final class KafkaProducerTests: XCTestCase { } func testSend() async throws { - let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) await withThrowingTaskGroup(of: Void.self) { group in @@ -70,7 +70,7 @@ final class KafkaProducerTests: XCTestCase { value: "Hello, World!" ) - let messageID = try await producer.send(message) + let messageID = try producer.send(message) for await messageResult in acks { guard case .success(let acknowledgedMessage) = messageResult else { @@ -91,7 +91,7 @@ final class KafkaProducerTests: XCTestCase { } func testSendEmptyMessage() async throws { - let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) await withThrowingTaskGroup(of: Void.self) { group in @@ -108,7 +108,7 @@ final class KafkaProducerTests: XCTestCase { value: ByteBuffer() ) - let messageID = try await producer.send(message) + let messageID = try producer.send(message) for await messageResult in acks { guard case .success(let acknowledgedMessage) = messageResult else { @@ -129,7 +129,7 @@ final class KafkaProducerTests: XCTestCase { } func testSendTwoTopics() async throws { - let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) await withThrowingTaskGroup(of: Void.self) { group in // Run Task @@ -152,8 +152,8 @@ final class KafkaProducerTests: XCTestCase { var messageIDs = Set() - messageIDs.insert(try await producer.send(message1)) - messageIDs.insert(try await producer.send(message2)) + messageIDs.insert(try producer.send(message1)) + messageIDs.insert(try producer.send(message2)) var acknowledgedMessages = Set() @@ -185,7 +185,7 @@ final class KafkaProducerTests: XCTestCase { } func testProducerNotUsableAfterShutdown() async throws { - let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) await producer.shutdownGracefully() await withThrowingTaskGroup(of: Void.self) { group in @@ -203,7 +203,7 @@ final class KafkaProducerTests: XCTestCase { ) do { - try await producer.send(message) + try producer.send(message) XCTFail("Method should have thrown error") } catch {} @@ -219,7 +219,7 @@ final class KafkaProducerTests: XCTestCase { func testNoMemoryLeakAfterShutdown() async throws { var producer: KafkaProducer? var acks: KafkaMessageAcknowledgements? - (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) + (producer, acks) = try KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest) _ = acks weak var producerCopy = producer