diff --git a/Package.swift b/Package.swift index 33d6397c..aac18bf8 100644 --- a/Package.swift +++ b/Package.swift @@ -47,6 +47,7 @@ let package = Package( // The zstd Swift package produces warnings that we cannot resolve: // https://github.com/facebook/zstd/issues/3328 .package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"), + .package(url: "https://github.com/swift-extras/swift-extras-json.git", .upToNextMajor(from: "0.6.0")), ], targets: [ .target( @@ -76,6 +77,7 @@ let package = Package( .product(name: "NIOCore", package: "swift-nio"), .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"), .product(name: "Logging", package: "swift-log"), + .product(name: "ExtrasJSON", package: "swift-extras-json"), ] ), .systemLibrary( diff --git a/Sources/SwiftKafka/Configuration/KafkaConfiguration.swift b/Sources/SwiftKafka/Configuration/KafkaConfiguration.swift index e78bd062..1be45a5a 100644 --- a/Sources/SwiftKafka/Configuration/KafkaConfiguration.swift +++ b/Sources/SwiftKafka/Configuration/KafkaConfiguration.swift @@ -206,3 +206,10 @@ public enum KafkaConfiguration { public static let v6 = IPAddressFamily(description: "v6") } } + +extension Duration { + // Calculated total milliseconds + internal var totalMilliseconds: Int64 { + self.components.seconds * 1000 + self.components.attoseconds / 1_000_000_000_000_000 + } +} diff --git a/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift index 6130c03e..4fc6e2c4 100644 --- a/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift +++ b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift @@ -20,6 +20,11 @@ public struct KafkaProducerConfiguration { /// Default: `.milliseconds(100)` public var pollInterval: Duration = .milliseconds(100) + /// Interval for librdkafka statistics reports + /// 0ms - disabled + /// >= 1ms - statistics provided every specified interval + public var statisticsInterval: Duration = .zero + /// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down. /// Default: `10000` public var flushTimeoutMilliseconds: Int = 10000 { @@ -107,6 +112,12 @@ extension KafkaProducerConfiguration { internal var dictionary: [String: String] { var resultDict: [String: String] = [:] + // we only check that it is 0 or >=1 ms, librdkafka checks for negativity + // in both debug and release + // FIXME: should we make `get throws` and throw exception instead of assert? + assert(self.statisticsInterval == .zero || self.statisticsInterval > Duration.milliseconds(1), "Statistics interval must be expressed in milliseconds") + resultDict["statistics.interval.ms"] = String(self.statisticsInterval.totalMilliseconds) + resultDict["enable.idempotence"] = String(self.enableIdempotence) resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages) resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes) diff --git a/Sources/SwiftKafka/KafkaProducerEvent.swift b/Sources/SwiftKafka/KafkaProducerEvent.swift index 8afbf8e8..f2b88706 100644 --- a/Sources/SwiftKafka/KafkaProducerEvent.swift +++ b/Sources/SwiftKafka/KafkaProducerEvent.swift @@ -16,6 +16,8 @@ public enum KafkaProducerEvent: Sendable, Hashable { /// A collection of delivery reports received from the Kafka cluster indicating the status of produced messages. case deliveryReports([KafkaDeliveryReport]) + /// Statistics from librdkafka + case statistics(KafkaStatistics) /// - Important: Always provide a `default` case when switching over this `enum`. case DO_NOT_SWITCH_OVER_THIS_EXHAUSITVELY @@ -23,6 +25,8 @@ public enum KafkaProducerEvent: Sendable, Hashable { switch event { case .deliveryReport(results: let results): self = .deliveryReports(results) + case .statistics(let stat): + self = .statistics(stat) case .consumerMessages: fatalError("Cannot cast \(event) to KafkaProducerEvent") } diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift b/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift index a33620b8..8aef17b4 100644 --- a/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift +++ b/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift @@ -136,6 +136,7 @@ final class RDKafkaClient: Sendable { enum KafkaEvent { case deliveryReport(results: [KafkaDeliveryReport]) case consumerMessages(result: Result) + case statistics(KafkaStatistics) } /// Poll the event `rd_kafka_queue_t` for new events. @@ -166,6 +167,8 @@ final class RDKafkaClient: Sendable { self.handleLogEvent(event) case .offsetCommit: self.handleOffsetCommitEvent(event) + case .statistics: + events.append(self.handleStatistics(event)) case .none: // Finished reading events, return early return events @@ -217,6 +220,11 @@ final class RDKafkaClient: Sendable { // The returned message(s) MUST NOT be freed with rd_kafka_message_destroy(). } + private func handleStatistics(_ event: OpaquePointer?) -> KafkaEvent { + let jsonStr = String(cString: rd_kafka_event_stats(event)) + return .statistics(KafkaStatistics(jsonString: jsonStr)) + } + /// Handle event of type `RDKafkaEvent.log`. /// /// - Parameter event: Pointer to underlying `rd_kafka_event_t`. diff --git a/Sources/SwiftKafka/Utilities/KafkaStatistics.swift b/Sources/SwiftKafka/Utilities/KafkaStatistics.swift new file mode 100644 index 00000000..2f9be9f2 --- /dev/null +++ b/Sources/SwiftKafka/Utilities/KafkaStatistics.swift @@ -0,0 +1,25 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-gsoc open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import ExtrasJSON + +public struct KafkaStatistics: Sendable, Hashable { + public let jsonString: String + + public var json: KafkaStatisticsJson { + get throws { + return try XJSONDecoder().decode(KafkaStatisticsJson.self, from: self.jsonString.utf8) + } + } +} diff --git a/Sources/SwiftKafka/Utilities/KafkaStatisticsJsonModel.swift b/Sources/SwiftKafka/Utilities/KafkaStatisticsJsonModel.swift new file mode 100644 index 00000000..23217a62 --- /dev/null +++ b/Sources/SwiftKafka/Utilities/KafkaStatisticsJsonModel.swift @@ -0,0 +1,177 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-gsoc open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +// This file was generated from JSON Schema using quicktype, do not modify it directly. +// To parse the JSON, add this file to your project and do: +// +// let statistics = try? newJSONDecoder().decode(KafkaStatisticsJsonModel.self, from: jsonData) + +// MARK: - Statistics + +public struct KafkaStatisticsJson: Hashable, Codable { + let name, clientID, type: String? + let ts, time, age, replyq: Int? + let msgCnt, msgSize, msgMax, msgSizeMax: Int? + let simpleCnt, metadataCacheCnt: Int? + let brokers: [String: Broker]? + let topics: [String: Topic]? + let cgrp: Cgrp? + let tx, txBytes, rx, rxBytes: Int? + let txmsgs, txmsgBytes, rxmsgs, rxmsgBytes: Int? + + enum CodingKeys: String, CodingKey { + case name + case clientID = "client_id" + case type, ts, time, age, replyq + case msgCnt = "msg_cnt" + case msgSize = "msg_size" + case msgMax = "msg_max" + case msgSizeMax = "msg_size_max" + case simpleCnt = "simple_cnt" + case metadataCacheCnt = "metadata_cache_cnt" + case brokers, topics, cgrp, tx + case txBytes = "tx_bytes" + case rx + case rxBytes = "rx_bytes" + case txmsgs + case txmsgBytes = "txmsg_bytes" + case rxmsgs + case rxmsgBytes = "rxmsg_bytes" + } +} + +// MARK: - Broker + +public struct Broker: Hashable, Codable { + let name: String? + let nodeid: Int? + let nodename, source, state: String? + let stateage, outbufCnt, outbufMsgCnt, waitrespCnt: Int? + let waitrespMsgCnt, tx, txbytes, txerrs: Int? + let txretries, txidle, reqTimeouts, rx: Int? + let rxbytes, rxerrs, rxcorriderrs, rxpartial: Int? + let rxidle, zbufGrow, bufGrow, wakeups: Int? + let connects, disconnects: Int? + let intLatency, outbufLatency, rtt, throttle: [String: Int]? + let req: [String: Int]? + let toppars: [String: Toppar]? + + enum CodingKeys: String, CodingKey { + case name, nodeid, nodename, source, state, stateage + case outbufCnt = "outbuf_cnt" + case outbufMsgCnt = "outbuf_msg_cnt" + case waitrespCnt = "waitresp_cnt" + case waitrespMsgCnt = "waitresp_msg_cnt" + case tx, txbytes, txerrs, txretries, txidle + case reqTimeouts = "req_timeouts" + case rx, rxbytes, rxerrs, rxcorriderrs, rxpartial, rxidle + case zbufGrow = "zbuf_grow" + case bufGrow = "buf_grow" + case wakeups, connects, disconnects + case intLatency = "int_latency" + case outbufLatency = "outbuf_latency" + case rtt, throttle, req, toppars + } +} + +// MARK: - Toppars + +struct Toppar: Hashable, Codable { + let topic: String? + let partition: Int? + + enum CodingKeys: String, CodingKey { + case topic, partition + } +} + +// MARK: - Cgrp + +struct Cgrp: Hashable, Codable { + let state: String? + let stateage: Int? + let joinState: String? + let rebalanceAge, rebalanceCnt: Int? + let rebalanceReason: String? + let assignmentSize: Int? + + enum CodingKeys: String, CodingKey { + case state, stateage + case joinState = "join_state" + case rebalanceAge = "rebalance_age" + case rebalanceCnt = "rebalance_cnt" + case rebalanceReason = "rebalance_reason" + case assignmentSize = "assignment_size" + } +} + +// MARK: - Topic + +struct Topic: Hashable, Codable { + let topic: String? + let age, metadataAge: Int? + let batchsize, batchcnt: [String: Int]? + let partitions: [String: Partition]? + + enum CodingKeys: String, CodingKey { + case topic, age + case metadataAge = "metadata_age" + case batchsize, batchcnt, partitions + } +} + +// MARK: - Partition + +struct Partition: Hashable, Codable { + let partition, broker, leader: Int? + let desired, unknown: Bool? + let msgqCnt, msgqBytes, xmitMsgqCnt, xmitMsgqBytes: Int? + let fetchqCnt, fetchqSize: Int? + let fetchState: String? + let queryOffset, nextOffset, appOffset, storedOffset: Int? + let commitedOffset, committedOffset, eofOffset, loOffset: Int? + let hiOffset, lsOffset, consumerLag, consumerLagStored: Int? + let txmsgs, txbytes, rxmsgs, rxbytes: Int? + let msgs, rxVerDrops, msgsInflight, nextACKSeq: Int? + let nextErrSeq, ackedMsgid: Int? + + enum CodingKeys: String, CodingKey { + case partition, broker, leader, desired, unknown + case msgqCnt = "msgq_cnt" + case msgqBytes = "msgq_bytes" + case xmitMsgqCnt = "xmit_msgq_cnt" + case xmitMsgqBytes = "xmit_msgq_bytes" + case fetchqCnt = "fetchq_cnt" + case fetchqSize = "fetchq_size" + case fetchState = "fetch_state" + case queryOffset = "query_offset" + case nextOffset = "next_offset" + case appOffset = "app_offset" + case storedOffset = "stored_offset" + case commitedOffset = "commited_offset" + case committedOffset = "committed_offset" + case eofOffset = "eof_offset" + case loOffset = "lo_offset" + case hiOffset = "hi_offset" + case lsOffset = "ls_offset" + case consumerLag = "consumer_lag" + case consumerLagStored = "consumer_lag_stored" + case txmsgs, txbytes, rxmsgs, rxbytes, msgs + case rxVerDrops = "rx_ver_drops" + case msgsInflight = "msgs_inflight" + case nextACKSeq = "next_ack_seq" + case nextErrSeq = "next_err_seq" + case ackedMsgid = "acked_msgid" + } +} diff --git a/Tests/SwiftKafkaTests/KafkaConsumerTests.swift b/Tests/SwiftKafkaTests/KafkaConsumerTests.swift index 6f2fab18..d2591353 100644 --- a/Tests/SwiftKafkaTests/KafkaConsumerTests.swift +++ b/Tests/SwiftKafkaTests/KafkaConsumerTests.swift @@ -14,6 +14,7 @@ import struct Foundation.UUID import Logging +import NIOConcurrencyHelpers import ServiceLifecycle @testable import SwiftKafka import XCTest @@ -85,4 +86,105 @@ final class KafkaConsumerTests: XCTestCase { ) } } +/* + func testConsumerStatistics() async throws { + // Set no bootstrap servers to trigger librdkafka configuration warning + let uniqueGroupID = UUID().uuidString + var config = KafkaConsumerConfiguration( + consumptionStrategy: .group(id: uniqueGroupID, topics: ["this-topic-does-not-exist"]) + ) + config.statisticsInterval = Duration.milliseconds(100) + + let stringJson = NIOLockedValueBox(String()) + let consumer = try KafkaConsumer(config: config, logger: .kafkaTest) + + guard let statistics = consumer.statistics else { + XCTFail("Statistics was not instantiated") + return + } + + let serviceGroup = ServiceGroup( + services: [consumer], + configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []), + logger: .kafkaTest + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await serviceGroup.run() + } + + // check for librdkafka statistics + group.addTask { + for try await stat in statistics { + stringJson.withLockedValue { + $0 = stat + } + } + } + + // Sleep for 1s to let poll loop receive statistics callback + try! await Task.sleep(for: .milliseconds(500)) + + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + + try await group.next() + } + + let stats = stringJson.withLockedValue { $0 } + XCTAssertFalse(stats.isEmpty) + } + + func testConsumerStatisticsJson() async throws { + // Set no bootstrap servers to trigger librdkafka configuration warning + let uniqueGroupID = UUID().uuidString + var config = KafkaConsumerConfiguration( + consumptionStrategy: .group(id: uniqueGroupID, topics: ["this-topic-does-not-exist"]) + ) + config.statisticsInterval = Duration.milliseconds(100) + + let stringJson = NIOLockedValueBox(nil) + let consumer = try KafkaConsumer(config: config, logger: .kafkaTest) + + guard let statistics = consumer.statistics else { + XCTFail("Statistics was not instantiated") + return + } + + let serviceGroup = ServiceGroup( + services: [consumer], + configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []), + logger: .kafkaTest + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await serviceGroup.run() + } + + // check for librdkafka statistics + group.addTask { + for try await stat in KafkaStatisticsJsonSequence(wrappedSequence: statistics) { + stringJson.withLockedValue { + $0 = stat + } + } + } + + // Sleep for 1s to let poll loop receive statistics callback + try! await Task.sleep(for: .milliseconds(500)) + + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + + try await group.next() + } + + let stats = stringJson.withLockedValue { $0 } + XCTAssertNotNil(stats) + } + */ } diff --git a/Tests/SwiftKafkaTests/KafkaProducerTests.swift b/Tests/SwiftKafkaTests/KafkaProducerTests.swift index a551c386..e7b16d09 100644 --- a/Tests/SwiftKafkaTests/KafkaProducerTests.swift +++ b/Tests/SwiftKafkaTests/KafkaProducerTests.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Logging +import NIOConcurrencyHelpers import NIOCore import ServiceLifecycle @testable import SwiftKafka @@ -356,4 +357,57 @@ final class KafkaProducerTests: XCTestCase { XCTAssertNil(producerCopy) } + + func testProducerStatistics() async throws { + self.config.statisticsInterval = Duration.milliseconds(100) + self.config.debug = [.all] + + let statistics = NIOLockedValueBox(nil) + let (producer, events) = try KafkaProducer.makeProducerWithEvents( + config: self.config, + logger: .kafkaTest + ) + + let serviceGroup = ServiceGroup( + services: [producer], + configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []), + logger: .kafkaTest + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await serviceGroup.run() + } + + // check for librdkafka statistics + group.addTask { + for try await e in events { + switch e { + case .statistics(let stat): + statistics.withLockedValue { + $0 = stat + } + default: + break + } + } + } + + // Sleep for 1s to let poll loop receive statistics callback + try! await Task.sleep(for: .milliseconds(500)) + + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + + try await group.next() + } + let stats = statistics.withLockedValue { $0 } + guard let stats else { + XCTFail("stats are not occurred") + return + } + XCTAssertFalse(stats.jsonString.isEmpty) + XCTAssertNoThrow(try stats.json) + } }