diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index 12fe0dca..616c434a 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -359,6 +359,8 @@ public final class KafkaConsumer: Sendable, Service { } /// Mark all messages up to the passed message in the topic as read. + /// Schedules a commit and returns immediately. + /// Any errors encountered after scheduling the commit will be discarded. /// /// This method is only used for manual offset management. /// @@ -367,17 +369,46 @@ public final class KafkaConsumer: Sendable, Service { /// - Parameters: /// - message: Last received message that shall be marked as read. /// - Throws: A ``KafkaError`` if committing failed. + public func scheduleCommit(_ message: KafkaConsumerMessage) throws { + let action = self.stateMachine.withLockedValue { $0.commit() } + switch action { + case .throwClosedError: + throw KafkaError.connectionClosed(reason: "Tried to commit message offset on a closed consumer") + case .commit(let client): + guard self.configuration.isAutoCommitEnabled == false else { + throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false") + } + + try client.scheduleCommit(message) + } + } + + @available(*, deprecated, renamed: "commit") public func commitSync(_ message: KafkaConsumerMessage) async throws { - let action = self.stateMachine.withLockedValue { $0.commitSync() } + try await self.commit(message) + } + + /// Mark all messages up to the passed message in the topic as read. + /// Awaits until the commit succeeds or an error is encountered. + /// + /// This method is only used for manual offset management. + /// + /// - Warning: This method fails if the ``KafkaConsumerConfiguration/isAutoCommitEnabled`` configuration property is set to `true` (default). + /// + /// - Parameters: + /// - message: Last received message that shall be marked as read. + /// - Throws: A ``KafkaError`` if committing failed. + public func commit(_ message: KafkaConsumerMessage) async throws { + let action = self.stateMachine.withLockedValue { $0.commit() } switch action { case .throwClosedError: throw KafkaError.connectionClosed(reason: "Tried to commit message offset on a closed consumer") - case .commitSync(let client): + case .commit(let client): guard self.configuration.isAutoCommitEnabled == false else { throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false") } - try await client.commitSync(message) + try await client.commit(message) } } @@ -596,23 +627,23 @@ extension KafkaConsumer { } } - /// Action to be taken when wanting to do a synchronous commit. - enum CommitSyncAction { - /// Do a synchronous commit. + /// Action to be taken when wanting to do a commit. + enum CommitAction { + /// Do a commit. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case commitSync( + case commit( client: RDKafkaClient ) /// Throw an error. The ``KafkaConsumer`` is closed. case throwClosedError } - /// Get action to be taken when wanting to do a synchronous commit. + /// Get action to be taken when wanting to do a commit. /// - Returns: The action to be taken. /// /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. - func commitSync() -> CommitSyncAction { + func commit() -> CommitAction { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") @@ -621,7 +652,7 @@ extension KafkaConsumer { case .consumptionStopped: fatalError("Cannot commit when consumption has been stopped") case .consuming(let client, _): - return .commitSync(client: client) + return .commit(client: client) case .finishing, .finished: return .throwClosedError } diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 42686937..da415f22 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -532,15 +532,46 @@ final class RDKafkaClient: Sendable { } } - /// Non-blocking **awaitable** commit of a the `message`'s offset to Kafka. + /// Non-blocking "fire-and-forget" commit of a `message`'s offset to Kafka. + /// Schedules a commit and returns immediately. + /// Any errors encountered after scheduling the commit will be discarded. /// /// - Parameter message: Last received message that shall be marked as read. - func commitSync(_ message: KafkaConsumerMessage) async throws { + /// - Throws: A ``KafkaError`` if scheduling the commit failed. + func scheduleCommit(_ message: KafkaConsumerMessage) throws { + // The offset committed is always the offset of the next requested message. + // Thus, we increase the offset of the current message by one before committing it. + // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 + let changesList = RDKafkaTopicPartitionList() + changesList.setOffset( + topic: message.topic, + partition: message.partition, + offset: Int64(message.offset.rawValue + 1) + ) + + let error = changesList.withListPointer { listPointer in + return rd_kafka_commit( + self.kafkaHandle, + listPointer, + 1 // async = true + ) + } + + if error != RD_KAFKA_RESP_ERR_NO_ERROR { + throw KafkaError.rdKafkaError(wrapping: error) + } + } + + /// Non-blocking **awaitable** commit of a `message`'s offset to Kafka. + /// + /// - Parameter message: Last received message that shall be marked as read. + /// - Throws: A ``KafkaError`` if the commit failed. + func commit(_ message: KafkaConsumerMessage) async throws { // Declare captured closure outside of withCheckedContinuation. // We do that because do an unretained pass of the captured closure to // librdkafka which means we have to keep a reference to the closure // ourselves to make sure it does not get deallocated before - // commitSync returns. + // commit returns. var capturedClosure: CapturedCommitCallback! try await withCheckedThrowingContinuation { continuation in capturedClosure = CapturedCommitCallback { result in diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift index 5bc233bd..e6cf82e5 100644 --- a/Tests/IntegrationTests/KafkaTests.swift +++ b/Tests/IntegrationTests/KafkaTests.swift @@ -214,7 +214,7 @@ final class KafkaTests: XCTestCase { } } - func testProduceAndConsumeWithCommitSync() async throws { + func testProduceAndConsumeWithScheduleCommit() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) @@ -254,7 +254,65 @@ final class KafkaTests: XCTestCase { var consumedMessages = [KafkaConsumerMessage]() for try await message in consumer.messages { consumedMessages.append(message) - try await consumer.commitSync(message) + try consumer.scheduleCommit(message) + + if consumedMessages.count >= testMessages.count { + break + } + } + + XCTAssertEqual(testMessages.count, consumedMessages.count) + } + + // Wait for Producer Task and Consumer Task to complete + try await group.next() + try await group.next() + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + } + } + + func testProduceAndConsumeWithCommit() async throws { + let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) + let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) + + var consumerConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group(id: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]), + bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] + ) + consumerConfig.isAutoCommitEnabled = false + consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning + consumerConfig.broker.addressFamily = .v4 + + let consumer = try KafkaConsumer( + configuration: consumerConfig, + logger: .kafkaTest + ) + + let serviceGroupConfiguration = ServiceGroupConfiguration(services: [producer, consumer], logger: .kafkaTest) + let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Consumer Run Task + group.addTask { + try await serviceGroup.run() + } + + // Producer Task + group.addTask { + try await Self.sendAndAcknowledgeMessages( + producer: producer, + events: events, + messages: testMessages + ) + } + + // Consumer Task + group.addTask { + var consumedMessages = [KafkaConsumerMessage]() + for try await message in consumer.messages { + consumedMessages.append(message) + try await consumer.commit(message) if consumedMessages.count >= testMessages.count { break @@ -323,7 +381,7 @@ final class KafkaTests: XCTestCase { continue } consumedMessages.append(message) - try await consumer.commitSync(message) + try await consumer.commit(message) if consumedMessages.count >= testMessages.count { break