From dd7e2338cef5ea5b65531710a4ffb861294e813b Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Wed, 6 Sep 2023 10:17:30 +0100 Subject: [PATCH] Review Franz Modifications: * `KafkaConsumer`: * rename `commitSync(_:)` -> `commit(_:)` * rename `commitAsync(_:)` -> `scheduleCommit(_:)` * `RDKafkaClient`: * rename `commitSync(_:)` -> `commit(_:)` * rename `commitAsync(_:)` -> `scheduleCommit(_:)` --- Sources/Kafka/KafkaConsumer.swift | 13 +++++++++---- Sources/Kafka/RDKafka/RDKafkaClient.swift | 6 +++--- Tests/IntegrationTests/KafkaTests.swift | 10 +++++----- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index c7f766bf..8fd4411c 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -369,7 +369,7 @@ public final class KafkaConsumer: Sendable, Service { /// - Parameters: /// - message: Last received message that shall be marked as read. /// - Throws: A ``KafkaError`` if committing failed. - public func commitAsync(_ message: KafkaConsumerMessage) throws { + public func triggerCommit(_ message: KafkaConsumerMessage) throws { let action = self.stateMachine.withLockedValue { $0.commit() } switch action { case .throwClosedError: @@ -379,10 +379,15 @@ public final class KafkaConsumer: Sendable, Service { throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false") } - try client.commitAsync(message) + try client.triggerCommit(message) } } + @available(*, deprecated, renamed: "commit") + public func commitSync(_ message: KafkaConsumerMessage) async throws { + try await self.commit(message) + } + /// Mark all messages up to the passed message in the topic as read. /// Awaits until the commit succeeds or an error is encountered. /// @@ -393,7 +398,7 @@ public final class KafkaConsumer: Sendable, Service { /// - Parameters: /// - message: Last received message that shall be marked as read. /// - Throws: A ``KafkaError`` if committing failed. - public func commitSync(_ message: KafkaConsumerMessage) async throws { + public func commit(_ message: KafkaConsumerMessage) async throws { let action = self.stateMachine.withLockedValue { $0.commit() } switch action { case .throwClosedError: @@ -403,7 +408,7 @@ public final class KafkaConsumer: Sendable, Service { throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false") } - try await client.commitSync(message) + try await client.commit(message) } } diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 12a0ffc9..56502a58 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -538,7 +538,7 @@ final class RDKafkaClient: Sendable { /// /// - Parameter message: Last received message that shall be marked as read. /// - Throws: A ``KafkaError`` if scheduling the commit failed. - func commitAsync(_ message: KafkaConsumerMessage) throws { + func triggerCommit(_ message: KafkaConsumerMessage) throws { // The offset committed is always the offset of the next requested message. // Thus, we increase the offset of the current message by one before committing it. // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 @@ -566,12 +566,12 @@ final class RDKafkaClient: Sendable { /// /// - Parameter message: Last received message that shall be marked as read. /// - Throws: A ``KafkaError`` if the commit failed. - func commitSync(_ message: KafkaConsumerMessage) async throws { + func commit(_ message: KafkaConsumerMessage) async throws { // Declare captured closure outside of withCheckedContinuation. // We do that because do an unretained pass of the captured closure to // librdkafka which means we have to keep a reference to the closure // ourselves to make sure it does not get deallocated before - // commitSync returns. + // commit returns. var capturedClosure: CapturedCommitCallback! try await withCheckedThrowingContinuation { continuation in capturedClosure = CapturedCommitCallback { result in diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift index 270c3aa1..06cf314c 100644 --- a/Tests/IntegrationTests/KafkaTests.swift +++ b/Tests/IntegrationTests/KafkaTests.swift @@ -214,7 +214,7 @@ final class KafkaTests: XCTestCase { } } - func testProduceAndConsumeWithCommitAsync() async throws { + func testProduceAndConsumeWithTriggerCommit() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) @@ -254,7 +254,7 @@ final class KafkaTests: XCTestCase { var consumedMessages = [KafkaConsumerMessage]() for try await message in consumer.messages { consumedMessages.append(message) - try consumer.commitAsync(message) + try consumer.triggerCommit(message) if consumedMessages.count >= testMessages.count { break @@ -272,7 +272,7 @@ final class KafkaTests: XCTestCase { } } - func testProduceAndConsumeWithCommitSync() async throws { + func testProduceAndConsumeWithCommit() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) @@ -312,7 +312,7 @@ final class KafkaTests: XCTestCase { var consumedMessages = [KafkaConsumerMessage]() for try await message in consumer.messages { consumedMessages.append(message) - try await consumer.commitSync(message) + try await consumer.commit(message) if consumedMessages.count >= testMessages.count { break @@ -381,7 +381,7 @@ final class KafkaTests: XCTestCase { continue } consumedMessages.append(message) - try await consumer.commitSync(message) + try await consumer.commit(message) if consumedMessages.count >= testMessages.count { break