Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaConsumer.commitAsync #126

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 41 additions & 10 deletions Sources/Kafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ public final class KafkaConsumer: Sendable, Service {
}

/// Mark all messages up to the passed message in the topic as read.
/// Schedules a commit and returns immediately.
/// Any errors encountered after scheduling the commit will be discarded.
///
/// This method is only used for manual offset management.
///
Expand All @@ -367,17 +369,46 @@ public final class KafkaConsumer: Sendable, Service {
/// - Parameters:
/// - message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if committing failed.
public func scheduleCommit(_ message: KafkaConsumerMessage) throws {
let action = self.stateMachine.withLockedValue { $0.commit() }
switch action {
case .throwClosedError:
throw KafkaError.connectionClosed(reason: "Tried to commit message offset on a closed consumer")
case .commit(let client):
guard self.configuration.isAutoCommitEnabled == false else {
throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false")
}

try client.scheduleCommit(message)
}
}

@available(*, deprecated, renamed: "commit")
public func commitSync(_ message: KafkaConsumerMessage) async throws {
let action = self.stateMachine.withLockedValue { $0.commitSync() }
try await self.commit(message)
}

/// Mark all messages up to the passed message in the topic as read.
/// Awaits until the commit succeeds or an error is encountered.
///
/// This method is only used for manual offset management.
///
/// - Warning: This method fails if the ``KafkaConsumerConfiguration/isAutoCommitEnabled`` configuration property is set to `true` (default).
///
/// - Parameters:
/// - message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if committing failed.
public func commit(_ message: KafkaConsumerMessage) async throws {
let action = self.stateMachine.withLockedValue { $0.commit() }
switch action {
case .throwClosedError:
throw KafkaError.connectionClosed(reason: "Tried to commit message offset on a closed consumer")
case .commitSync(let client):
case .commit(let client):
guard self.configuration.isAutoCommitEnabled == false else {
throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false")
}

try await client.commitSync(message)
try await client.commit(message)
}
}

Expand Down Expand Up @@ -596,23 +627,23 @@ extension KafkaConsumer {
}
}

/// Action to be taken when wanting to do a synchronous commit.
enum CommitSyncAction {
/// Do a synchronous commit.
/// Action to be taken when wanting to do a commit.
enum CommitAction {
/// Do a commit.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case commitSync(
case commit(
client: RDKafkaClient
)
/// Throw an error. The ``KafkaConsumer`` is closed.
case throwClosedError
}

/// Get action to be taken when wanting to do a synchronous commit.
/// Get action to be taken when wanting to do a commit.
/// - Returns: The action to be taken.
///
/// - Important: This function throws a `fatalError` if called while in the `.initializing` state.
func commitSync() -> CommitSyncAction {
func commit() -> CommitAction {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
Expand All @@ -621,7 +652,7 @@ extension KafkaConsumer {
case .consumptionStopped:
fatalError("Cannot commit when consumption has been stopped")
case .consuming(let client, _):
return .commitSync(client: client)
return .commit(client: client)
case .finishing, .finished:
return .throwClosedError
}
Expand Down
37 changes: 34 additions & 3 deletions Sources/Kafka/RDKafka/RDKafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -532,15 +532,46 @@ final class RDKafkaClient: Sendable {
}
}

/// Non-blocking **awaitable** commit of a the `message`'s offset to Kafka.
/// Non-blocking "fire-and-forget" commit of a `message`'s offset to Kafka.
/// Schedules a commit and returns immediately.
/// Any errors encountered after scheduling the commit will be discarded.
///
/// - Parameter message: Last received message that shall be marked as read.
func commitSync(_ message: KafkaConsumerMessage) async throws {
/// - Throws: A ``KafkaError`` if scheduling the commit failed.
func scheduleCommit(_ message: KafkaConsumerMessage) throws {
// The offset committed is always the offset of the next requested message.
// Thus, we increase the offset of the current message by one before committing it.
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
let changesList = RDKafkaTopicPartitionList()
changesList.setOffset(
topic: message.topic,
partition: message.partition,
offset: Int64(message.offset.rawValue + 1)
)

let error = changesList.withListPointer { listPointer in
return rd_kafka_commit(
self.kafkaHandle,
listPointer,
1 // async = true
)
}

if error != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: error)
}
}

/// Non-blocking **awaitable** commit of a `message`'s offset to Kafka.
///
/// - Parameter message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if the commit failed.
func commit(_ message: KafkaConsumerMessage) async throws {
// Declare captured closure outside of withCheckedContinuation.
// We do that because do an unretained pass of the captured closure to
// librdkafka which means we have to keep a reference to the closure
// ourselves to make sure it does not get deallocated before
// commitSync returns.
// commit returns.
var capturedClosure: CapturedCommitCallback!
try await withCheckedThrowingContinuation { continuation in
capturedClosure = CapturedCommitCallback { result in
Expand Down
64 changes: 61 additions & 3 deletions Tests/IntegrationTests/KafkaTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ final class KafkaTests: XCTestCase {
}
}

func testProduceAndConsumeWithCommitSync() async throws {
func testProduceAndConsumeWithScheduleCommit() async throws {
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest)

Expand Down Expand Up @@ -254,7 +254,65 @@ final class KafkaTests: XCTestCase {
var consumedMessages = [KafkaConsumerMessage]()
for try await message in consumer.messages {
consumedMessages.append(message)
try await consumer.commitSync(message)
try consumer.scheduleCommit(message)

if consumedMessages.count >= testMessages.count {
break
}
}

XCTAssertEqual(testMessages.count, consumedMessages.count)
}

// Wait for Producer Task and Consumer Task to complete
try await group.next()
try await group.next()
// Shutdown the serviceGroup
await serviceGroup.triggerGracefulShutdown()
}
}

func testProduceAndConsumeWithCommit() async throws {
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest)

var consumerConfig = KafkaConsumerConfiguration(
consumptionStrategy: .group(id: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]),
bootstrapBrokerAddresses: [self.bootstrapBrokerAddress]
)
consumerConfig.isAutoCommitEnabled = false
consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning
consumerConfig.broker.addressFamily = .v4

let consumer = try KafkaConsumer(
configuration: consumerConfig,
logger: .kafkaTest
)

let serviceGroupConfiguration = ServiceGroupConfiguration(services: [producer, consumer], logger: .kafkaTest)
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)

try await withThrowingTaskGroup(of: Void.self) { group in
// Consumer Run Task
group.addTask {
try await serviceGroup.run()
}

// Producer Task
group.addTask {
try await Self.sendAndAcknowledgeMessages(
producer: producer,
events: events,
messages: testMessages
)
}

// Consumer Task
group.addTask {
var consumedMessages = [KafkaConsumerMessage]()
for try await message in consumer.messages {
consumedMessages.append(message)
try await consumer.commit(message)

if consumedMessages.count >= testMessages.count {
break
Expand Down Expand Up @@ -323,7 +381,7 @@ final class KafkaTests: XCTestCase {
continue
}
consumedMessages.append(message)
try await consumer.commitSync(message)
try await consumer.commit(message)

if consumedMessages.count >= testMessages.count {
break
Expand Down