From 588af601e9a70945a1b9568c80d5d1ac2c26eff1 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Fri, 30 Jun 2023 13:34:34 +0100 Subject: [PATCH] Fix: `KafkaClient.closeConsumer` should not block (#73) * Fix: make `KafkaConsumer.commitSync` non-blocking Motivation: Currently our invocation to `rd_kafka_commit` inside of `KafkaCosumer.commitSync` is blocking a cooperative thread. This PR aims to make `KafkaCosumer.commitSync` non-blocking by using the callback-based commit API. Modifications: * move `commitSync` logic to `KafkaClient` * replace the blocking invocation to [rd_kafka_commit](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#ab96539928328f14c3c9177ea0c896c87) with a callback-based invocation to [rd_kafka_commit_queue](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#af76a6a73baa9c2621536e3f6882a3c1a) which is then wrapped inside a `withAsyncThrowingContinuation` statement * `KafkaClient.consumerClose`: make non-blocking Motivation: [rd_kakfa_consumer_close](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a37b54d329e12d745889defe96e7d043d) was blocking. This PR proposes using the [rd_kakfa_consumer_close_queue](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a9dd5c18bdfed81c8847b259f0a8d498d) API which is non-blocking and served through the normal poll loop. We now Modifications: * `KafkaClient.consumerClose`: use `rd_kakfa_consumer_close_queue` in favour of `rd_kakfa_consumer_close` * create a new variable `KafkaClient.isConsumerClosed` that indicates if the poll loop needs to continue polling or if it can stop running * updated state management in `KafkaConsumer` to accomodate for polling when the `KafkaConsumer` is in the process of closing Result: Calling `KafkaClient.consumerClose` is not blocking anymore. * Review Franz Modifications: * introduce new `KafkaConsumer.StateMachine.State` `.finishing` to avoid retaining `client` in state `.finished` * rename `KafkaConsumer.shutdownGracefully` to `KafkaConsumer.triggerGracefulShutdown` * add note that `KafkaConsumer.commitSync` does not support `Task` cancellation --- Sources/SwiftKafka/KafkaClient.swift | 96 ++++++++++++++++++- Sources/SwiftKafka/KafkaConsumer.swift | 95 ++++++++---------- .../SwiftKafka/RDKafka/RDKafkaConfig.swift | 4 +- 3 files changed, 137 insertions(+), 58 deletions(-) diff --git a/Sources/SwiftKafka/KafkaClient.swift b/Sources/SwiftKafka/KafkaClient.swift index 5dca295b..1dfb6876 100644 --- a/Sources/SwiftKafka/KafkaClient.swift +++ b/Sources/SwiftKafka/KafkaClient.swift @@ -118,14 +118,102 @@ final class KafkaClient { } } - /// Close the consumer. + /// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`. + /// This is specifically used to pass a Swift closure as a commit callback for the ``KafkaConsumer``. + final class CapturedCommitCallback { + typealias Closure = (Result) -> Void + let closure: Closure + + init(_ closure: @escaping Closure) { + self.closure = closure + } + } + + /// Non-blocking commit of a the `message`'s offset to Kafka. + /// + /// - Parameter message: Last received message that shall be marked as read. + func commitSync(_ message: KafkaConsumerMessage) async throws { + // Declare captured closure outside of withCheckedContinuation. + // We do that because do an unretained pass of the captured closure to + // librdkafka which means we have to keep a reference to the closure + // ourselves to make sure it does not get deallocated before + // commitSync returns. + var capturedClosure: CapturedCommitCallback! + try await withCheckedThrowingContinuation { continuation in + capturedClosure = CapturedCommitCallback { result in + continuation.resume(with: result) + } + + // The offset committed is always the offset of the next requested message. + // Thus, we increase the offset of the current message by one before committing it. + // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 + let changesList = RDKafkaTopicPartitionList() + changesList.setOffset( + topic: message.topic, + partition: message.partition, + offset: Int64(message.offset + 1) + ) + + // Unretained pass because the reference that librdkafka holds to capturedClosure + // should not be counted in ARC as this can lead to memory leaks. + let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque() + + let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle) + + // Create a C closure that calls the captured closure + let callbackWrapper: ( + @convention(c) ( + OpaquePointer?, + rd_kafka_resp_err_t, + UnsafeMutablePointer?, + UnsafeMutableRawPointer? + ) -> Void + ) = { _, error, _, opaquePointer in + + guard let opaquePointer = opaquePointer else { + fatalError("Could not resolve reference to catpured Swift callback instance") + } + let opaque = Unmanaged.fromOpaque(opaquePointer).takeUnretainedValue() + + let actualCallback = opaque.closure + + if error == RD_KAFKA_RESP_ERR_NO_ERROR { + actualCallback(.success(())) + } else { + let kafkaError = KafkaError.rdKafkaError(wrapping: error) + actualCallback(.failure(kafkaError)) + } + } + + changesList.withListPointer { listPointer in + rd_kafka_commit_queue( + self.kafkaHandle, + listPointer, + consumerQueue, + callbackWrapper, + opaquePointer + ) + } + } + } + + /// Close the consumer asynchronously. This means revoking its assignemnt, committing offsets to broker and + /// leaving the consumer group (if applicable). + /// + /// Make sure to run poll loop until ``KafkaClient/consumerIsClosed`` returns `true`. func consumerClose() throws { - let result = rd_kafka_consumer_close(self.kafkaHandle) - if result != RD_KAFKA_RESP_ERR_NO_ERROR { - throw KafkaError.rdKafkaError(wrapping: result) + let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle) + let result = rd_kafka_consumer_close_queue(self.kafkaHandle, consumerQueue) + let kafkaError = rd_kafka_error_code(result) + if kafkaError != RD_KAFKA_RESP_ERR_NO_ERROR { + throw KafkaError.rdKafkaError(wrapping: kafkaError) } } + var isConsumerClosed: Bool { + rd_kafka_consumer_closed(self.kafkaHandle) == 1 + } + /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle. /// - Warning: Do not escape the pointer from the closure for later use. /// - Parameter body: The closure will use the Kafka handle pointer. diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index ca908380..c039c65c 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -40,10 +40,10 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate { } func didTerminate() { - // Duplicate of _shutdownGracefully + // Duplicate of _triggerGracefulShutdown let action = self.stateMachine.withLockedValue { $0.finish() } switch action { - case .shutdownGracefullyAndFinishSource(let client, let source): + case .triggerGracefulShutdownAndFinishSource(let client, let source): source.finish() do { @@ -151,7 +151,7 @@ public final class KafkaConsumer { } deinit { - self.shutdownGracefully() + self.triggerGracefulShutdown() } /// Subscribe to the given list of `topics`. @@ -201,16 +201,20 @@ public final class KafkaConsumer { switch nextAction { case .pollForAndYieldMessage(let client, let source): do { - guard let message = try client.consumerPoll() else { - break + if let message = try client.consumerPoll() { + // We do not support back pressure, we can ignore the yield result + _ = source.yield(message) } - // We do not support back pressure, we can ignore the yield result - _ = source.yield(message) } catch { source.finish() throw error } try await Task.sleep(for: self.config.pollInterval) + case .pollUntilClosed(let client): + // Ignore poll result, we are closing down and just polling to commit + // outstanding consumer state + _ = try client.consumerPoll() + try await Task.sleep(for: self.config.pollInterval) case .terminatePollLoop: return } @@ -222,18 +226,8 @@ public final class KafkaConsumer { /// - Parameter message: Last received message that shall be marked as read. /// - Throws: A ``KafkaError`` if committing failed. /// - Warning: This method fails if the `enable.auto.commit` configuration property is set to `true`. + /// - Important: This method does not support `Task` cancellation. public func commitSync(_ message: KafkaConsumerMessage) async throws { - try await withCheckedThrowingContinuation { continuation in - do { - try self._commitSync(message) // Blocks until commiting the offset is done - continuation.resume() - } catch { - continuation.resume(throwing: error) - } - } - } - - private func _commitSync(_ message: KafkaConsumerMessage) throws { let action = self.stateMachine.withLockedValue { $0.commitSync() } switch action { case .throwClosedError: @@ -243,29 +237,7 @@ public final class KafkaConsumer { throw KafkaError.config(reason: "Committing manually only works if enable.auto.commit is set to false") } - // The offset committed is always the offset of the next requested message. - // Thus, we increase the offset of the current message by one before committing it. - // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 - let changesList = RDKafkaTopicPartitionList() - changesList.setOffset( - topic: message.topic, - partition: message.partition, - offset: Int64(message.offset + 1) - ) - - let result = client.withKafkaHandlePointer { handle in - changesList.withListPointer { listPointer in - rd_kafka_commit( - handle, - listPointer, - 0 - ) // Blocks until commiting the offset is done - // -> Will be resolved by: https://github.com/swift-server/swift-kafka-gsoc/pull/68 - } - } - guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: result) - } + try await client.commitSync(message) } } @@ -273,11 +245,11 @@ public final class KafkaConsumer { /// /// - Note: Invoking this function is not always needed as the ``KafkaConsumer`` /// will already shut down when consumption of the ``KafkaConsumerMessages`` has ended. - private func shutdownGracefully() { + private func triggerGracefulShutdown() { let action = self.stateMachine.withLockedValue { $0.finish() } switch action { - case .shutdownGracefullyAndFinishSource(let client, let source): - self._shutdownGracefullyAndFinishSource( + case .triggerGracefulShutdownAndFinishSource(let client, let source): + self._triggerGracefulShutdownAndFinishSource( client: client, source: source, logger: self.logger @@ -287,7 +259,7 @@ public final class KafkaConsumer { } } - private func _shutdownGracefullyAndFinishSource( + private func _triggerGracefulShutdownAndFinishSource( client: KafkaClient, source: Producer.Source, logger: Logger @@ -336,7 +308,12 @@ extension KafkaConsumer { client: KafkaClient, source: Producer.Source ) - /// The ``KafkaConsumer`` has been closed. + /// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked. + /// We are now in the process of commiting our last state to the broker. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + case finishing(client: KafkaClient) + /// The ``KafkaConsumer`` is closed. case finished } @@ -368,6 +345,11 @@ extension KafkaConsumer { client: KafkaClient, source: Producer.Source ) + /// The ``KafkaConsumer`` is in the process of closing down, but still needs to poll + /// to commit its state to the broker. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + case pollUntilClosed(client: KafkaClient) /// Terminate the poll loop. case terminatePollLoop } @@ -376,7 +358,7 @@ extension KafkaConsumer { /// - Returns: The next action to be taken when wanting to poll, or `nil` if there is no action to be taken. /// /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. - func nextPollLoopAction() -> PollLoopAction { + mutating func nextPollLoopAction() -> PollLoopAction { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") @@ -384,6 +366,13 @@ extension KafkaConsumer { fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") case .consuming(let client, let source): return .pollForAndYieldMessage(client: client, source: source) + case .finishing(let client): + if client.isConsumerClosed { + self.state = .finished + return .terminatePollLoop + } else { + return .pollUntilClosed(client: client) + } case .finished: return .terminatePollLoop } @@ -409,7 +398,7 @@ extension KafkaConsumer { source: source ) return .setUpConnection(client: client) - case .consuming, .finished: + case .consuming, .finishing, .finished: fatalError("\(#function) should only be invoked upon initialization of KafkaConsumer") } } @@ -438,7 +427,7 @@ extension KafkaConsumer { fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets") case .consuming(let client, _): return .commitSync(client: client) - case .finished: + case .finishing, .finished: return .throwClosedError } } @@ -449,7 +438,7 @@ extension KafkaConsumer { /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case shutdownGracefullyAndFinishSource( + case triggerGracefulShutdownAndFinishSource( client: KafkaClient, source: Producer.Source ) @@ -466,12 +455,12 @@ extension KafkaConsumer { case .initializing: fatalError("subscribe() / assign() should have been invoked before \(#function)") case .consuming(let client, let source): - self.state = .finished - return .shutdownGracefullyAndFinishSource( + self.state = .finishing(client: client) + return .triggerGracefulShutdownAndFinishSource( client: client, source: source ) - case .finished: + case .finishing, .finished: return nil } } diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift b/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift index 398985ff..eee3ff7e 100644 --- a/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift +++ b/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift @@ -78,7 +78,9 @@ struct RDKafkaConfig { ) -> CapturedClosures { let closures = CapturedClosures() - // Pass the the reference to Opaque as an opaque object + // Pass the captured closure to the C closure as an opaque object. + // Unretained pass because the reference that librdkafka holds to the captured closures + // should not be counted in ARC as this can lead to memory leaks. let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(closures).toOpaque() rd_kafka_conf_set_opaque( configPointer,