Skip to content

Commit

Permalink
Fix: KafkaClient.closeConsumer should not block (#73)
Browse files Browse the repository at this point in the history
* Fix: make `KafkaConsumer.commitSync` non-blocking

Motivation:

Currently our invocation to `rd_kafka_commit` inside of
`KafkaCosumer.commitSync` is blocking a cooperative thread.
This PR aims to make `KafkaCosumer.commitSync` non-blocking by using the
callback-based commit API.

Modifications:

* move `commitSync` logic to `KafkaClient`
* replace the blocking invocation to
  [rd_kafka_commit](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#ab96539928328f14c3c9177ea0c896c87)
  with a callback-based invocation to
  [rd_kafka_commit_queue](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#af76a6a73baa9c2621536e3f6882a3c1a)
  which is then wrapped inside a `withAsyncThrowingContinuation` statement

* `KafkaClient.consumerClose`: make non-blocking

Motivation:

[rd_kakfa_consumer_close](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a37b54d329e12d745889defe96e7d043d)
was blocking. This PR proposes using the
[rd_kakfa_consumer_close_queue](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a9dd5c18bdfed81c8847b259f0a8d498d)
API which is non-blocking and served through the normal poll loop.
We now
Modifications:

* `KafkaClient.consumerClose`: use `rd_kakfa_consumer_close_queue` in
  favour of `rd_kakfa_consumer_close`
* create a new variable `KafkaClient.isConsumerClosed` that indicates
  if the poll loop needs to continue polling or if it can stop running
* updated state management in `KafkaConsumer` to accomodate for polling
  when the `KafkaConsumer` is in the process of closing

Result:

Calling `KafkaClient.consumerClose` is not blocking anymore.

* Review Franz

Modifications:

* introduce new `KafkaConsumer.StateMachine.State` `.finishing` to avoid
  retaining `client` in state `.finished`
* rename `KafkaConsumer.shutdownGracefully` to
  `KafkaConsumer.triggerGracefulShutdown`
* add note that `KafkaConsumer.commitSync` does not support `Task`
  cancellation
  • Loading branch information
felixschlegel authored Jun 30, 2023
1 parent 77d0b0e commit 588af60
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 58 deletions.
96 changes: 92 additions & 4 deletions Sources/SwiftKafka/KafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,102 @@ final class KafkaClient {
}
}

/// Close the consumer.
/// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`.
/// This is specifically used to pass a Swift closure as a commit callback for the ``KafkaConsumer``.
final class CapturedCommitCallback {
typealias Closure = (Result<Void, KafkaError>) -> Void
let closure: Closure

init(_ closure: @escaping Closure) {
self.closure = closure
}
}

/// Non-blocking commit of a the `message`'s offset to Kafka.
///
/// - Parameter message: Last received message that shall be marked as read.
func commitSync(_ message: KafkaConsumerMessage) async throws {
// Declare captured closure outside of withCheckedContinuation.
// We do that because do an unretained pass of the captured closure to
// librdkafka which means we have to keep a reference to the closure
// ourselves to make sure it does not get deallocated before
// commitSync returns.
var capturedClosure: CapturedCommitCallback!
try await withCheckedThrowingContinuation { continuation in
capturedClosure = CapturedCommitCallback { result in
continuation.resume(with: result)
}

// The offset committed is always the offset of the next requested message.
// Thus, we increase the offset of the current message by one before committing it.
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
let changesList = RDKafkaTopicPartitionList()
changesList.setOffset(
topic: message.topic,
partition: message.partition,
offset: Int64(message.offset + 1)
)

// Unretained pass because the reference that librdkafka holds to capturedClosure
// should not be counted in ARC as this can lead to memory leaks.
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque()

let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle)

// Create a C closure that calls the captured closure
let callbackWrapper: (
@convention(c) (
OpaquePointer?,
rd_kafka_resp_err_t,
UnsafeMutablePointer<rd_kafka_topic_partition_list_t>?,
UnsafeMutableRawPointer?
) -> Void
) = { _, error, _, opaquePointer in

guard let opaquePointer = opaquePointer else {
fatalError("Could not resolve reference to catpured Swift callback instance")
}
let opaque = Unmanaged<CapturedCommitCallback>.fromOpaque(opaquePointer).takeUnretainedValue()

let actualCallback = opaque.closure

if error == RD_KAFKA_RESP_ERR_NO_ERROR {
actualCallback(.success(()))
} else {
let kafkaError = KafkaError.rdKafkaError(wrapping: error)
actualCallback(.failure(kafkaError))
}
}

changesList.withListPointer { listPointer in
rd_kafka_commit_queue(
self.kafkaHandle,
listPointer,
consumerQueue,
callbackWrapper,
opaquePointer
)
}
}
}

/// Close the consumer asynchronously. This means revoking its assignemnt, committing offsets to broker and
/// leaving the consumer group (if applicable).
///
/// Make sure to run poll loop until ``KafkaClient/consumerIsClosed`` returns `true`.
func consumerClose() throws {
let result = rd_kafka_consumer_close(self.kafkaHandle)
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: result)
let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle)
let result = rd_kafka_consumer_close_queue(self.kafkaHandle, consumerQueue)
let kafkaError = rd_kafka_error_code(result)
if kafkaError != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: kafkaError)
}
}

var isConsumerClosed: Bool {
rd_kafka_consumer_closed(self.kafkaHandle) == 1
}

/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
/// - Warning: Do not escape the pointer from the closure for later use.
/// - Parameter body: The closure will use the Kafka handle pointer.
Expand Down
95 changes: 42 additions & 53 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
}

func didTerminate() {
// Duplicate of _shutdownGracefully
// Duplicate of _triggerGracefulShutdown
let action = self.stateMachine.withLockedValue { $0.finish() }
switch action {
case .shutdownGracefullyAndFinishSource(let client, let source):
case .triggerGracefulShutdownAndFinishSource(let client, let source):
source.finish()

do {
Expand Down Expand Up @@ -151,7 +151,7 @@ public final class KafkaConsumer {
}

deinit {
self.shutdownGracefully()
self.triggerGracefulShutdown()
}

/// Subscribe to the given list of `topics`.
Expand Down Expand Up @@ -201,16 +201,20 @@ public final class KafkaConsumer {
switch nextAction {
case .pollForAndYieldMessage(let client, let source):
do {
guard let message = try client.consumerPoll() else {
break
if let message = try client.consumerPoll() {
// We do not support back pressure, we can ignore the yield result
_ = source.yield(message)
}
// We do not support back pressure, we can ignore the yield result
_ = source.yield(message)
} catch {
source.finish()
throw error
}
try await Task.sleep(for: self.config.pollInterval)
case .pollUntilClosed(let client):
// Ignore poll result, we are closing down and just polling to commit
// outstanding consumer state
_ = try client.consumerPoll()
try await Task.sleep(for: self.config.pollInterval)
case .terminatePollLoop:
return
}
Expand All @@ -222,18 +226,8 @@ public final class KafkaConsumer {
/// - Parameter message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if committing failed.
/// - Warning: This method fails if the `enable.auto.commit` configuration property is set to `true`.
/// - Important: This method does not support `Task` cancellation.
public func commitSync(_ message: KafkaConsumerMessage) async throws {
try await withCheckedThrowingContinuation { continuation in
do {
try self._commitSync(message) // Blocks until commiting the offset is done
continuation.resume()
} catch {
continuation.resume(throwing: error)
}
}
}

private func _commitSync(_ message: KafkaConsumerMessage) throws {
let action = self.stateMachine.withLockedValue { $0.commitSync() }
switch action {
case .throwClosedError:
Expand All @@ -243,41 +237,19 @@ public final class KafkaConsumer {
throw KafkaError.config(reason: "Committing manually only works if enable.auto.commit is set to false")
}

// The offset committed is always the offset of the next requested message.
// Thus, we increase the offset of the current message by one before committing it.
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
let changesList = RDKafkaTopicPartitionList()
changesList.setOffset(
topic: message.topic,
partition: message.partition,
offset: Int64(message.offset + 1)
)

let result = client.withKafkaHandlePointer { handle in
changesList.withListPointer { listPointer in
rd_kafka_commit(
handle,
listPointer,
0
) // Blocks until commiting the offset is done
// -> Will be resolved by: https://github.com/swift-server/swift-kafka-gsoc/pull/68
}
}
guard result == RD_KAFKA_RESP_ERR_NO_ERROR else {
throw KafkaError.rdKafkaError(wrapping: result)
}
try await client.commitSync(message)
}
}

/// This function is used to gracefully shut down a Kafka consumer client.
///
/// - Note: Invoking this function is not always needed as the ``KafkaConsumer``
/// will already shut down when consumption of the ``KafkaConsumerMessages`` has ended.
private func shutdownGracefully() {
private func triggerGracefulShutdown() {
let action = self.stateMachine.withLockedValue { $0.finish() }
switch action {
case .shutdownGracefullyAndFinishSource(let client, let source):
self._shutdownGracefullyAndFinishSource(
case .triggerGracefulShutdownAndFinishSource(let client, let source):
self._triggerGracefulShutdownAndFinishSource(
client: client,
source: source,
logger: self.logger
Expand All @@ -287,7 +259,7 @@ public final class KafkaConsumer {
}
}

private func _shutdownGracefullyAndFinishSource(
private func _triggerGracefulShutdownAndFinishSource(
client: KafkaClient,
source: Producer.Source,
logger: Logger
Expand Down Expand Up @@ -336,7 +308,12 @@ extension KafkaConsumer {
client: KafkaClient,
source: Producer.Source
)
/// The ``KafkaConsumer`` has been closed.
/// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked.
/// We are now in the process of commiting our last state to the broker.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case finishing(client: KafkaClient)
/// The ``KafkaConsumer`` is closed.
case finished
}

Expand Down Expand Up @@ -368,6 +345,11 @@ extension KafkaConsumer {
client: KafkaClient,
source: Producer.Source
)
/// The ``KafkaConsumer`` is in the process of closing down, but still needs to poll
/// to commit its state to the broker.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case pollUntilClosed(client: KafkaClient)
/// Terminate the poll loop.
case terminatePollLoop
}
Expand All @@ -376,14 +358,21 @@ extension KafkaConsumer {
/// - Returns: The next action to be taken when wanting to poll, or `nil` if there is no action to be taken.
///
/// - Important: This function throws a `fatalError` if called while in the `.initializing` state.
func nextPollLoopAction() -> PollLoopAction {
mutating func nextPollLoopAction() -> PollLoopAction {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .initializing:
fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages")
case .consuming(let client, let source):
return .pollForAndYieldMessage(client: client, source: source)
case .finishing(let client):
if client.isConsumerClosed {
self.state = .finished
return .terminatePollLoop
} else {
return .pollUntilClosed(client: client)
}
case .finished:
return .terminatePollLoop
}
Expand All @@ -409,7 +398,7 @@ extension KafkaConsumer {
source: source
)
return .setUpConnection(client: client)
case .consuming, .finished:
case .consuming, .finishing, .finished:
fatalError("\(#function) should only be invoked upon initialization of KafkaConsumer")
}
}
Expand Down Expand Up @@ -438,7 +427,7 @@ extension KafkaConsumer {
fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets")
case .consuming(let client, _):
return .commitSync(client: client)
case .finished:
case .finishing, .finished:
return .throwClosedError
}
}
Expand All @@ -449,7 +438,7 @@ extension KafkaConsumer {
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
case shutdownGracefullyAndFinishSource(
case triggerGracefulShutdownAndFinishSource(
client: KafkaClient,
source: Producer.Source
)
Expand All @@ -466,12 +455,12 @@ extension KafkaConsumer {
case .initializing:
fatalError("subscribe() / assign() should have been invoked before \(#function)")
case .consuming(let client, let source):
self.state = .finished
return .shutdownGracefullyAndFinishSource(
self.state = .finishing(client: client)
return .triggerGracefulShutdownAndFinishSource(
client: client,
source: source
)
case .finished:
case .finishing, .finished:
return nil
}
}
Expand Down
4 changes: 3 additions & 1 deletion Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ struct RDKafkaConfig {
) -> CapturedClosures {
let closures = CapturedClosures()

// Pass the the reference to Opaque as an opaque object
// Pass the captured closure to the C closure as an opaque object.
// Unretained pass because the reference that librdkafka holds to the captured closures
// should not be counted in ARC as this can lead to memory leaks.
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(closures).toOpaque()
rd_kafka_conf_set_opaque(
configPointer,
Expand Down

0 comments on commit 588af60

Please sign in to comment.