Skip to content

Commit

Permalink
KafkaConsumer.commitAsync
Browse files Browse the repository at this point in the history
Motivation:

Having `KafkaConsumer.commitSync` be `async` is not always convenient as
it suspends the `KafkaConsumer.messages` read loop and can therefore
lower throughput.

This PR introduces a new method `KafkaConsumer.commitAsync` that allows
users who don't care about the result of the `commit` to commit in a
"fire-and-forget" manner.

Modifications:

* new method `KafkaConsumer.commitAsync`
* rename `KafkaConsumer.StateMachine.commitSync` to
  `KafkaConsumer.StateMachine.commit` to serve both `commitSync` and
  `commitAsync`
* add new test for `KafkaConsumer.commitAsync`
  • Loading branch information
felixschlegel committed Aug 29, 2023
1 parent 8592c61 commit 0324457
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 10 deletions.
44 changes: 35 additions & 9 deletions Sources/Kafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,32 @@ public final class KafkaConsumer: Sendable, Service {
}

/// Mark all messages up to the passed message in the topic as read.
/// Schedules a commit and returns immediately.
/// Any errors encountered after scheduling the commit will be discarded.
///
/// This method is only used for manual offset management.
///
/// - Warning: This method fails if the ``KafkaConsumerConfiguration/isAutoCommitEnabled`` configuration property is set to `true` (default).
///
/// - Parameters:
/// - message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if committing failed.
public func commitAsync(_ message: KafkaConsumerMessage) throws {
let action = self.stateMachine.withLockedValue { $0.commit() }
switch action {
case .throwClosedError:
throw KafkaError.connectionClosed(reason: "Tried to commit message offset on a closed consumer")
case .commit(let client):
guard self.configuration.isAutoCommitEnabled == false else {
throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false")
}

try client.commitAsync(message)
}
}

/// Mark all messages up to the passed message in the topic as read.
/// Awaits until the commit succeeds or an error is encountered.
///
/// This method is only used for manual offset management.
///
Expand All @@ -368,11 +394,11 @@ public final class KafkaConsumer: Sendable, Service {
/// - message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if committing failed.
public func commitSync(_ message: KafkaConsumerMessage) async throws {
let action = self.stateMachine.withLockedValue { $0.commitSync() }
let action = self.stateMachine.withLockedValue { $0.commit() }
switch action {
case .throwClosedError:
throw KafkaError.connectionClosed(reason: "Tried to commit message offset on a closed consumer")
case .commitSync(let client):
case .commit(let client):
guard self.configuration.isAutoCommitEnabled == false else {
throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false")
}
Expand Down Expand Up @@ -596,23 +622,23 @@ extension KafkaConsumer {
}
}

/// Action to be taken when wanting to do a synchronous commit.
enum CommitSyncAction {
/// Do a synchronous commit.
/// Action to be taken when wanting to do a commit.
enum CommitAction {
/// Do a commit.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case commitSync(
case commit(
client: RDKafkaClient
)
/// Throw an error. The ``KafkaConsumer`` is closed.
case throwClosedError
}

/// Get action to be taken when wanting to do a synchronous commit.
/// Get action to be taken when wanting to do a commit.
/// - Returns: The action to be taken.
///
/// - Important: This function throws a `fatalError` if called while in the `.initializing` state.
func commitSync() -> CommitSyncAction {
func commit() -> CommitAction {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
Expand All @@ -621,7 +647,7 @@ extension KafkaConsumer {
case .consumptionStopped:
fatalError("Cannot commit when consumption has been stopped")
case .consuming(let client, _):
return .commitSync(client: client)
return .commit(client: client)
case .finishing, .finished:
return .throwClosedError
}
Expand Down
33 changes: 32 additions & 1 deletion Sources/Kafka/RDKafka/RDKafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,40 @@ final class RDKafkaClient: Sendable {
}
}

/// Non-blocking **awaitable** commit of a the `message`'s offset to Kafka.
/// Non-blocking "fire-and-forget" commit of a `message`'s offset to Kafka.
/// Schedules a commit and returns immediately.
/// Any errors encountered after scheduling the commit will be discarded.
///
/// - Parameter message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if scheduling the commit failed.
func commitAsync(_ message: KafkaConsumerMessage) throws {
// The offset committed is always the offset of the next requested message.
// Thus, we increase the offset of the current message by one before committing it.
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
let changesList = RDKafkaTopicPartitionList()
changesList.setOffset(
topic: message.topic,
partition: message.partition,
offset: Int64(message.offset.rawValue + 1)
)

let error = changesList.withListPointer { listPointer in
return rd_kafka_commit(
self.kafkaHandle,
listPointer,
1 // async = true
)
}

if error != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: error)
}
}

/// Non-blocking **awaitable** commit of a `message`'s offset to Kafka.
///
/// - Parameter message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if the commit failed.
func commitSync(_ message: KafkaConsumerMessage) async throws {
// Declare captured closure outside of withCheckedContinuation.
// We do that because do an unretained pass of the captured closure to
Expand Down
58 changes: 58 additions & 0 deletions Tests/IntegrationTests/KafkaTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,64 @@ final class KafkaTests: XCTestCase {
}
}

func testProduceAndConsumeWithCommitAsync() async throws {
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest)

var consumerConfig = KafkaConsumerConfiguration(
consumptionStrategy: .group(id: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]),
bootstrapBrokerAddresses: [self.bootstrapBrokerAddress]
)
consumerConfig.isAutoCommitEnabled = false
consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning
consumerConfig.broker.addressFamily = .v4

let consumer = try KafkaConsumer(
configuration: consumerConfig,
logger: .kafkaTest
)

let serviceGroupConfiguration = ServiceGroupConfiguration(services: [producer, consumer], logger: .kafkaTest)
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)

try await withThrowingTaskGroup(of: Void.self) { group in
// Consumer Run Task
group.addTask {
try await serviceGroup.run()
}

// Producer Task
group.addTask {
try await Self.sendAndAcknowledgeMessages(
producer: producer,
events: events,
messages: testMessages
)
}

// Consumer Task
group.addTask {
var consumedMessages = [KafkaConsumerMessage]()
for try await message in consumer.messages {
consumedMessages.append(message)
try consumer.commitAsync(message)

if consumedMessages.count >= testMessages.count {
break
}
}

XCTAssertEqual(testMessages.count, consumedMessages.count)
}

// Wait for Producer Task and Consumer Task to complete
try await group.next()
try await group.next()
// Shutdown the serviceGroup
await serviceGroup.triggerGracefulShutdown()
}
}

func testProduceAndConsumeWithCommitSync() async throws {
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest)
Expand Down

0 comments on commit 0324457

Please sign in to comment.