From 9268027719fa0f8e57ebe2b462b9a3e615106e1d Mon Sep 17 00:00:00 2001 From: mr-swifter <103502437+mr-swifter@users.noreply.github.com> Date: Thu, 29 Jun 2023 16:13:02 +0300 Subject: [PATCH] Add an option to redirect librdkafka logging to Logger (#61) * Add an option to redirect librdkafka logging to Logger * address PR feedback * polishes --- .gitignore | 3 +- Sources/SwiftKafka/KafkaClient.swift | 4 +- Sources/SwiftKafka/KafkaProducer.swift | 4 +- Sources/SwiftKafka/RDKafka/RDKafka.swift | 19 +-- .../SwiftKafka/RDKafka/RDKafkaConfig.swift | 121 ++++++++++++++---- 5 files changed, 114 insertions(+), 37 deletions(-) diff --git a/.gitignore b/.gitignore index 2fc5b3b2..734831f0 100644 --- a/.gitignore +++ b/.gitignore @@ -4,8 +4,7 @@ /*.xcodeproj xcuserdata/ DerivedData/ -.swiftpm/config/registries.json -.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata .netrc Package.resolved .*.sw? +.swiftpm diff --git a/Sources/SwiftKafka/KafkaClient.swift b/Sources/SwiftKafka/KafkaClient.swift index ff1b84d5..5dca295b 100644 --- a/Sources/SwiftKafka/KafkaClient.swift +++ b/Sources/SwiftKafka/KafkaClient.swift @@ -24,13 +24,13 @@ final class KafkaClient { /// Handle for the C library's Kafka instance. private let kafkaHandle: OpaquePointer /// References the opaque object passed to the config to ensure ARC retains it as long as the client exists. - private let opaque: RDKafkaConfig.CapturedClosure? + private let opaque: RDKafkaConfig.CapturedClosures /// A logger. private let logger: Logger init( kafkaHandle: OpaquePointer, - opaque: RDKafkaConfig.CapturedClosure?, + opaque: RDKafkaConfig.CapturedClosures, logger: Logger ) { self.kafkaHandle = kafkaHandle diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift index 306a18df..69c18782 100644 --- a/Sources/SwiftKafka/KafkaProducer.swift +++ b/Sources/SwiftKafka/KafkaProducer.swift @@ -110,7 +110,7 @@ public actor KafkaProducer { configDictionary: config.dictionary, // Having no callback will discard any incoming acknowledgement messages // Ref: rdkafka_broker.c:rd_kafka_dr_msgq - callback: nil, + deliveryReportCallback: nil, logger: logger ) @@ -149,7 +149,7 @@ public actor KafkaProducer { let client = try RDKafka.createClient( type: .producer, configDictionary: config.dictionary, - callback: { [logger, streamContinuation] messageResult in + deliveryReportCallback: { [logger, streamContinuation] messageResult in guard let messageResult else { logger.error("Could not resolve acknowledged message") return diff --git a/Sources/SwiftKafka/RDKafka/RDKafka.swift b/Sources/SwiftKafka/RDKafka/RDKafka.swift index 6c189c33..79584758 100644 --- a/Sources/SwiftKafka/RDKafka/RDKafka.swift +++ b/Sources/SwiftKafka/RDKafka/RDKafka.swift @@ -27,21 +27,24 @@ struct RDKafka { static func createClient( type: ClientType, configDictionary: [String: String], - callback: ((RDKafkaConfig.KafkaAcknowledgementResult?) -> Void)? = nil, + deliveryReportCallback: RDKafkaConfig.CapturedClosures.DeliveryReportClosure? = nil, logger: Logger ) throws -> KafkaClient { let clientType = type == .producer ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER let rdConfig = try RDKafkaConfig.createFrom(configDictionary: configDictionary) - let closurePointer: RDKafkaConfig.CapturedClosure? - if let callback { - // CapturedClosure must be retained by KafkaClient as long as message acknowledgements are received - closurePointer = RDKafkaConfig.setDeliveryReportCallback(configPointer: rdConfig, callback) - } else { - closurePointer = nil + // Check that delivery report callback can be only set for producer + guard deliveryReportCallback == nil || type == .producer else { + fatalError("Delivery report callback can't be defined for consumer client") } + let opaque = RDKafkaConfig.setCallbackClosures( + configPointer: rdConfig, + deliveryReportCallback: deliveryReportCallback, + logger: logger + ) + let errorChars = UnsafeMutablePointer.allocate(capacity: KafkaClient.stringSize) defer { errorChars.deallocate() } @@ -58,6 +61,6 @@ struct RDKafka { throw KafkaError.client(reason: errorString) } - return KafkaClient(kafkaHandle: handle, opaque: closurePointer, logger: logger) + return KafkaClient(kafkaHandle: handle, opaque: opaque, logger: logger) } } diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift b/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift index d285e277..4173be82 100644 --- a/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift +++ b/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift @@ -13,18 +13,20 @@ //===----------------------------------------------------------------------===// import Crdkafka +import Logging /// A collection of helper functions wrapping common `rd_kafka_conf_*` functions in Swift. struct RDKafkaConfig { typealias KafkaAcknowledgementResult = Result /// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`. - final class CapturedClosure { - typealias Closure = (KafkaAcknowledgementResult?) -> Void - let closure: Closure + final class CapturedClosures { + typealias DeliveryReportClosure = (KafkaAcknowledgementResult?) -> Void + var deliveryReportClosure: DeliveryReportClosure? - init(_ closure: @escaping Closure) { - self.closure = closure - } + typealias LoggingClosure = (Int32, UnsafePointer, UnsafePointer) -> Void + var loggingClosure: LoggingClosure? + + init() { } } /// Create a new `rd_kafka_conf_t` object in memory and initialize it with the given configuration properties. @@ -63,46 +65,119 @@ struct RDKafkaConfig { } } - /// A Swift wrapper for `rd_kafka_conf_set_dr_msg_cb`. - /// Defines a function that is called upon every message acknowledgement. + /// Registers passed closures as callbacks and sets the application's opaque pointer that will be passed to callbacks + /// - Parameter type: Kafka client type: `Consumer` or `Producer` /// - Parameter configPointer: An `OpaquePointer` pointing to the `rd_kafka_conf_t` object in memory. - /// - Parameter callback: A closure that is invoked upon message acknowledgement. - /// - Returns: A ``CapturedClosure`` object that must me retained by the caller as long as acknowledgements are received. - static func setDeliveryReportCallback( + /// - Parameter deliveryReportCallback: A closure that is invoked upon message acknowledgement. + /// - Parameter logger: Logger instance + /// - Returns: A ``CapturedClosures`` object that must me retained by the caller as long as it exists. + static func setCallbackClosures( configPointer: OpaquePointer, - _ callback: @escaping ((KafkaAcknowledgementResult?) -> Void) - ) -> CapturedClosure { - let capturedClosure = CapturedClosure(callback) - // Pass the captured closure to the C closure as an opaque object - let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque() + deliveryReportCallback: CapturedClosures.DeliveryReportClosure? = nil, + logger: Logger + ) -> CapturedClosures { + let closures = CapturedClosures() + + // Pass the the reference to Opaque as an opaque object + let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(closures).toOpaque() rd_kafka_conf_set_opaque( configPointer, opaquePointer ) + // Set delivery report callback + if let deliveryReportCallback { + Self.setDeliveryReportCallback(configPointer: configPointer, capturedClosures: closures, deliveryReportCallback) + } + // Set logging callback + Self.setLoggingCallback(configPointer: configPointer, capturedClosures: closures, logger: logger) + + return closures + } + + /// A Swift wrapper for `rd_kafka_conf_set_dr_msg_cb`. + /// Defines a function that is called upon every message acknowledgement. + /// - Parameter configPointer: An `OpaquePointer` pointing to the `rd_kafka_conf_t` object in memory. + /// - Parameter callback: A closure that is invoked upon message acknowledgement. + private static func setDeliveryReportCallback( + configPointer: OpaquePointer, + capturedClosures: CapturedClosures, + _ deliveryReportCallback: @escaping RDKafkaConfig.CapturedClosures.DeliveryReportClosure + ) { + capturedClosures.deliveryReportClosure = deliveryReportCallback + // Create a C closure that calls the captured closure let callbackWrapper: ( @convention(c) (OpaquePointer?, UnsafePointer?, UnsafeMutableRawPointer?) -> Void ) = { _, messagePointer, opaquePointer in guard let opaquePointer = opaquePointer else { - fatalError("Could not resolve reference to KafkaProducer instance") + fatalError("Could not resolve reference to CapturedClosures") } - let opaque = Unmanaged.fromOpaque(opaquePointer).takeUnretainedValue() + let closures = Unmanaged.fromOpaque(opaquePointer).takeUnretainedValue() - let actualCallback = opaque.closure + guard let actualCallback = closures.deliveryReportClosure else { + fatalError("Delivery report callback is set, but user closure is not defined") + } let messageResult = Self.convertMessageToAcknowledgementResult(messagePointer: messagePointer) actualCallback(messageResult) - - // The messagePointer is automatically destroyed by librdkafka - // For safety reasons, we only use it inside of this callback } rd_kafka_conf_set_dr_msg_cb( configPointer, callbackWrapper ) + } + + /// A Swift wrapper for `rd_kafka_conf_set_log_cb`. + /// Defines a function that is called upon every log and redirects output to ``logger``. + /// - Parameter configPointer: An `OpaquePointer` pointing to the `rd_kafka_conf_t` object in memory. + /// - Parameter logger: Logger instance + private static func setLoggingCallback( + configPointer: OpaquePointer, + capturedClosures: CapturedClosures, + logger: Logger + ) { + let loggingClosure: RDKafkaConfig.CapturedClosures.LoggingClosure = { level, fac, buf in + // Mapping according to https://en.wikipedia.org/wiki/Syslog + switch level { + case 0 ... 2: /* Emergency, Alert, Critical */ + logger.critical(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac)) + case 3: /* Error */ + logger.error(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac)) + case 4: /* Warning */ + logger.warning(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac)) + case 5: /* Notice */ + logger.notice(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac)) + case 6: /* Informational */ + logger.info(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac)) + default: /* Debug */ + logger.debug(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac)) + } + } + capturedClosures.loggingClosure = loggingClosure + + let loggingWrapper: ( + @convention(c) (OpaquePointer?, Int32, UnsafePointer?, UnsafePointer?) -> Void + ) = { rkKafkaT, level, fac, buf in + guard let fac, let buf else { + return + } + + guard let opaquePointer = rd_kafka_opaque(rkKafkaT) else { + fatalError("Could not resolve reference to CapturedClosures") + } + let opaque = Unmanaged.fromOpaque(opaquePointer).takeUnretainedValue() + + guard let closure = opaque.loggingClosure else { + fatalError("Could not resolve logger instance") + } + closure(level, fac, buf) + } - return capturedClosure + rd_kafka_conf_set_log_cb( + configPointer, + loggingWrapper + ) } /// Convert an unsafe`rd_kafka_message_t` object to a safe ``KafkaAcknowledgementResult``.