Skip to content

Commit

Permalink
Add an option to redirect librdkafka logging to Logger (#61)
Browse files Browse the repository at this point in the history
* Add an option to redirect librdkafka logging to Logger

* address PR feedback

* polishes
  • Loading branch information
mr-swifter authored Jun 29, 2023
1 parent b0bc014 commit 9268027
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 37 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
/*.xcodeproj
xcuserdata/
DerivedData/
.swiftpm/config/registries.json
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
.netrc
Package.resolved
.*.sw?
.swiftpm
4 changes: 2 additions & 2 deletions Sources/SwiftKafka/KafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ final class KafkaClient {
/// Handle for the C library's Kafka instance.
private let kafkaHandle: OpaquePointer
/// References the opaque object passed to the config to ensure ARC retains it as long as the client exists.
private let opaque: RDKafkaConfig.CapturedClosure?
private let opaque: RDKafkaConfig.CapturedClosures
/// A logger.
private let logger: Logger

init(
kafkaHandle: OpaquePointer,
opaque: RDKafkaConfig.CapturedClosure?,
opaque: RDKafkaConfig.CapturedClosures,
logger: Logger
) {
self.kafkaHandle = kafkaHandle
Expand Down
4 changes: 2 additions & 2 deletions Sources/SwiftKafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public actor KafkaProducer {
configDictionary: config.dictionary,
// Having no callback will discard any incoming acknowledgement messages
// Ref: rdkafka_broker.c:rd_kafka_dr_msgq
callback: nil,
deliveryReportCallback: nil,
logger: logger
)

Expand Down Expand Up @@ -149,7 +149,7 @@ public actor KafkaProducer {
let client = try RDKafka.createClient(
type: .producer,
configDictionary: config.dictionary,
callback: { [logger, streamContinuation] messageResult in
deliveryReportCallback: { [logger, streamContinuation] messageResult in
guard let messageResult else {
logger.error("Could not resolve acknowledged message")
return
Expand Down
19 changes: 11 additions & 8 deletions Sources/SwiftKafka/RDKafka/RDKafka.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,24 @@ struct RDKafka {
static func createClient(
type: ClientType,
configDictionary: [String: String],
callback: ((RDKafkaConfig.KafkaAcknowledgementResult?) -> Void)? = nil,
deliveryReportCallback: RDKafkaConfig.CapturedClosures.DeliveryReportClosure? = nil,
logger: Logger
) throws -> KafkaClient {
let clientType = type == .producer ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER

let rdConfig = try RDKafkaConfig.createFrom(configDictionary: configDictionary)

let closurePointer: RDKafkaConfig.CapturedClosure?
if let callback {
// CapturedClosure must be retained by KafkaClient as long as message acknowledgements are received
closurePointer = RDKafkaConfig.setDeliveryReportCallback(configPointer: rdConfig, callback)
} else {
closurePointer = nil
// Check that delivery report callback can be only set for producer
guard deliveryReportCallback == nil || type == .producer else {
fatalError("Delivery report callback can't be defined for consumer client")
}

let opaque = RDKafkaConfig.setCallbackClosures(
configPointer: rdConfig,
deliveryReportCallback: deliveryReportCallback,
logger: logger
)

let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: KafkaClient.stringSize)
defer { errorChars.deallocate() }

Expand All @@ -58,6 +61,6 @@ struct RDKafka {
throw KafkaError.client(reason: errorString)
}

return KafkaClient(kafkaHandle: handle, opaque: closurePointer, logger: logger)
return KafkaClient(kafkaHandle: handle, opaque: opaque, logger: logger)
}
}
121 changes: 98 additions & 23 deletions Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@
//===----------------------------------------------------------------------===//

import Crdkafka
import Logging

/// A collection of helper functions wrapping common `rd_kafka_conf_*` functions in Swift.
struct RDKafkaConfig {
typealias KafkaAcknowledgementResult = Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
/// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`.
final class CapturedClosure {
typealias Closure = (KafkaAcknowledgementResult?) -> Void
let closure: Closure
final class CapturedClosures {
typealias DeliveryReportClosure = (KafkaAcknowledgementResult?) -> Void
var deliveryReportClosure: DeliveryReportClosure?

init(_ closure: @escaping Closure) {
self.closure = closure
}
typealias LoggingClosure = (Int32, UnsafePointer<CChar>, UnsafePointer<CChar>) -> Void
var loggingClosure: LoggingClosure?

init() { }
}

/// Create a new `rd_kafka_conf_t` object in memory and initialize it with the given configuration properties.
Expand Down Expand Up @@ -63,46 +65,119 @@ struct RDKafkaConfig {
}
}

/// A Swift wrapper for `rd_kafka_conf_set_dr_msg_cb`.
/// Defines a function that is called upon every message acknowledgement.
/// Registers passed closures as callbacks and sets the application's opaque pointer that will be passed to callbacks
/// - Parameter type: Kafka client type: `Consumer` or `Producer`
/// - Parameter configPointer: An `OpaquePointer` pointing to the `rd_kafka_conf_t` object in memory.
/// - Parameter callback: A closure that is invoked upon message acknowledgement.
/// - Returns: A ``CapturedClosure`` object that must me retained by the caller as long as acknowledgements are received.
static func setDeliveryReportCallback(
/// - Parameter deliveryReportCallback: A closure that is invoked upon message acknowledgement.
/// - Parameter logger: Logger instance
/// - Returns: A ``CapturedClosures`` object that must me retained by the caller as long as it exists.
static func setCallbackClosures(
configPointer: OpaquePointer,
_ callback: @escaping ((KafkaAcknowledgementResult?) -> Void)
) -> CapturedClosure {
let capturedClosure = CapturedClosure(callback)
// Pass the captured closure to the C closure as an opaque object
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque()
deliveryReportCallback: CapturedClosures.DeliveryReportClosure? = nil,
logger: Logger
) -> CapturedClosures {
let closures = CapturedClosures()

// Pass the the reference to Opaque as an opaque object
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(closures).toOpaque()
rd_kafka_conf_set_opaque(
configPointer,
opaquePointer
)

// Set delivery report callback
if let deliveryReportCallback {
Self.setDeliveryReportCallback(configPointer: configPointer, capturedClosures: closures, deliveryReportCallback)
}
// Set logging callback
Self.setLoggingCallback(configPointer: configPointer, capturedClosures: closures, logger: logger)

return closures
}

/// A Swift wrapper for `rd_kafka_conf_set_dr_msg_cb`.
/// Defines a function that is called upon every message acknowledgement.
/// - Parameter configPointer: An `OpaquePointer` pointing to the `rd_kafka_conf_t` object in memory.
/// - Parameter callback: A closure that is invoked upon message acknowledgement.
private static func setDeliveryReportCallback(
configPointer: OpaquePointer,
capturedClosures: CapturedClosures,
_ deliveryReportCallback: @escaping RDKafkaConfig.CapturedClosures.DeliveryReportClosure
) {
capturedClosures.deliveryReportClosure = deliveryReportCallback

// Create a C closure that calls the captured closure
let callbackWrapper: (
@convention(c) (OpaquePointer?, UnsafePointer<rd_kafka_message_t>?, UnsafeMutableRawPointer?) -> Void
) = { _, messagePointer, opaquePointer in
guard let opaquePointer = opaquePointer else {
fatalError("Could not resolve reference to KafkaProducer instance")
fatalError("Could not resolve reference to CapturedClosures")
}
let opaque = Unmanaged<CapturedClosure>.fromOpaque(opaquePointer).takeUnretainedValue()
let closures = Unmanaged<CapturedClosures>.fromOpaque(opaquePointer).takeUnretainedValue()

let actualCallback = opaque.closure
guard let actualCallback = closures.deliveryReportClosure else {
fatalError("Delivery report callback is set, but user closure is not defined")
}
let messageResult = Self.convertMessageToAcknowledgementResult(messagePointer: messagePointer)
actualCallback(messageResult)

// The messagePointer is automatically destroyed by librdkafka
// For safety reasons, we only use it inside of this callback
}

rd_kafka_conf_set_dr_msg_cb(
configPointer,
callbackWrapper
)
}

/// A Swift wrapper for `rd_kafka_conf_set_log_cb`.
/// Defines a function that is called upon every log and redirects output to ``logger``.
/// - Parameter configPointer: An `OpaquePointer` pointing to the `rd_kafka_conf_t` object in memory.
/// - Parameter logger: Logger instance
private static func setLoggingCallback(
configPointer: OpaquePointer,
capturedClosures: CapturedClosures,
logger: Logger
) {
let loggingClosure: RDKafkaConfig.CapturedClosures.LoggingClosure = { level, fac, buf in
// Mapping according to https://en.wikipedia.org/wiki/Syslog
switch level {
case 0 ... 2: /* Emergency, Alert, Critical */
logger.critical(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac))
case 3: /* Error */
logger.error(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac))
case 4: /* Warning */
logger.warning(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac))
case 5: /* Notice */
logger.notice(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac))
case 6: /* Informational */
logger.info(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac))
default: /* Debug */
logger.debug(Logger.Message(stringLiteral: String(cString: buf)), source: String(cString: fac))
}
}
capturedClosures.loggingClosure = loggingClosure

let loggingWrapper: (
@convention(c) (OpaquePointer?, Int32, UnsafePointer<CChar>?, UnsafePointer<CChar>?) -> Void
) = { rkKafkaT, level, fac, buf in
guard let fac, let buf else {
return
}

guard let opaquePointer = rd_kafka_opaque(rkKafkaT) else {
fatalError("Could not resolve reference to CapturedClosures")
}
let opaque = Unmanaged<CapturedClosures>.fromOpaque(opaquePointer).takeUnretainedValue()

guard let closure = opaque.loggingClosure else {
fatalError("Could not resolve logger instance")
}
closure(level, fac, buf)
}

return capturedClosure
rd_kafka_conf_set_log_cb(
configPointer,
loggingWrapper
)
}

/// Convert an unsafe`rd_kafka_message_t` object to a safe ``KafkaAcknowledgementResult``.
Expand Down

0 comments on commit 9268027

Please sign in to comment.