Skip to content

Commit

Permalink
introduce statistics for producer
Browse files Browse the repository at this point in the history
  • Loading branch information
blindspotbounty committed Jul 26, 2023
1 parent f8cb0a0 commit c870864
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ let package = Package(
// The zstd Swift package produces warnings that we cannot resolve:
// https://github.com/facebook/zstd/issues/3328
.package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"),
.package(url: "https://github.com/swift-extras/swift-extras-json.git", .upToNextMajor(from: "0.6.0")),
],
targets: [
.target(
Expand Down Expand Up @@ -76,6 +77,7 @@ let package = Package(
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
.product(name: "Logging", package: "swift-log"),
.product(name: "ExtrasJSON", package: "swift-extras-json"),
]
),
.systemLibrary(
Expand Down
7 changes: 7 additions & 0 deletions Sources/SwiftKafka/Configuration/KafkaConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,10 @@ public enum KafkaConfiguration {
public static let v6 = IPAddressFamily(description: "v6")
}
}

extension Duration {
// Calculated total milliseconds
internal var totalMilliseconds: Int64 {
self.components.seconds * 1000 + self.components.attoseconds / 1_000_000_000_000_000
}
}
11 changes: 11 additions & 0 deletions Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public struct KafkaProducerConfiguration {
/// Default: `.milliseconds(100)`
public var pollInterval: Duration = .milliseconds(100)

/// Interval for librdkafka statistics reports
/// 0ms - disabled
/// >= 1ms - statistics provided every specified interval
public var statisticsInterval: Duration = .zero

/// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
/// Default: `10000`
public var flushTimeoutMilliseconds: Int = 10000 {
Expand Down Expand Up @@ -107,6 +112,12 @@ extension KafkaProducerConfiguration {
internal var dictionary: [String: String] {
var resultDict: [String: String] = [:]

// we only check that it is 0 or >=1 ms, librdkafka checks for negativity
// in both debug and release
// FIXME: should we make `get throws` and throw exception instead of assert?
assert(self.statisticsInterval == .zero || self.statisticsInterval > Duration.milliseconds(1), "Statistics interval must be expressed in milliseconds")
resultDict["statistics.interval.ms"] = String(self.statisticsInterval.totalMilliseconds)

resultDict["enable.idempotence"] = String(self.enableIdempotence)
resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)
Expand Down
4 changes: 4 additions & 0 deletions Sources/SwiftKafka/KafkaProducerEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
public enum KafkaProducerEvent: Sendable, Hashable {
/// A collection of delivery reports received from the Kafka cluster indicating the status of produced messages.
case deliveryReports([KafkaDeliveryReport])
/// Statistics from librdkafka
case statistics(KafkaStatistics)
/// - Important: Always provide a `default` case when switching over this `enum`.
case DO_NOT_SWITCH_OVER_THIS_EXHAUSITVELY

internal init(_ event: RDKafkaClient.KafkaEvent) {
switch event {
case .deliveryReport(results: let results):
self = .deliveryReports(results)
case .statistics(let stat):
self = .statistics(stat)
case .consumerMessages:
fatalError("Cannot cast \(event) to KafkaProducerEvent")
}
Expand Down
8 changes: 8 additions & 0 deletions Sources/SwiftKafka/RDKafka/RDKafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ final class RDKafkaClient: Sendable {
enum KafkaEvent {
case deliveryReport(results: [KafkaDeliveryReport])
case consumerMessages(result: Result<KafkaConsumerMessage, Error>)
case statistics(KafkaStatistics)
}

/// Poll the event `rd_kafka_queue_t` for new events.
Expand Down Expand Up @@ -166,6 +167,8 @@ final class RDKafkaClient: Sendable {
self.handleLogEvent(event)
case .offsetCommit:
self.handleOffsetCommitEvent(event)
case .statistics:
events.append(self.handleStatistics(event))
case .none:
// Finished reading events, return early
return events
Expand Down Expand Up @@ -217,6 +220,11 @@ final class RDKafkaClient: Sendable {
// The returned message(s) MUST NOT be freed with rd_kafka_message_destroy().
}

private func handleStatistics(_ event: OpaquePointer?) -> KafkaEvent {
let jsonStr = String(cString: rd_kafka_event_stats(event))
return .statistics(KafkaStatistics(jsonString: jsonStr))
}

/// Handle event of type `RDKafkaEvent.log`.
///
/// - Parameter event: Pointer to underlying `rd_kafka_event_t`.
Expand Down
25 changes: 25 additions & 0 deletions Sources/SwiftKafka/Utilities/KafkaStatistics.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-gsoc open source project
//
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import ExtrasJSON

public struct KafkaStatistics: Sendable, Hashable {
public let jsonString: String

public var json: KafkaStatisticsJson {
get throws {
return try XJSONDecoder().decode(KafkaStatisticsJson.self, from: self.jsonString.utf8)
}
}
}
177 changes: 177 additions & 0 deletions Sources/SwiftKafka/Utilities/KafkaStatisticsJsonModel.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-gsoc open source project
//
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

// This file was generated from JSON Schema using quicktype, do not modify it directly.
// To parse the JSON, add this file to your project and do:
//
// let statistics = try? newJSONDecoder().decode(KafkaStatisticsJsonModel.self, from: jsonData)

// MARK: - Statistics

public struct KafkaStatisticsJson: Hashable, Codable {
let name, clientID, type: String?
let ts, time, age, replyq: Int?
let msgCnt, msgSize, msgMax, msgSizeMax: Int?
let simpleCnt, metadataCacheCnt: Int?
let brokers: [String: Broker]?
let topics: [String: Topic]?
let cgrp: Cgrp?
let tx, txBytes, rx, rxBytes: Int?
let txmsgs, txmsgBytes, rxmsgs, rxmsgBytes: Int?

enum CodingKeys: String, CodingKey {
case name
case clientID = "client_id"
case type, ts, time, age, replyq
case msgCnt = "msg_cnt"
case msgSize = "msg_size"
case msgMax = "msg_max"
case msgSizeMax = "msg_size_max"
case simpleCnt = "simple_cnt"
case metadataCacheCnt = "metadata_cache_cnt"
case brokers, topics, cgrp, tx
case txBytes = "tx_bytes"
case rx
case rxBytes = "rx_bytes"
case txmsgs
case txmsgBytes = "txmsg_bytes"
case rxmsgs
case rxmsgBytes = "rxmsg_bytes"
}
}

// MARK: - Broker

public struct Broker: Hashable, Codable {
let name: String?
let nodeid: Int?
let nodename, source, state: String?
let stateage, outbufCnt, outbufMsgCnt, waitrespCnt: Int?
let waitrespMsgCnt, tx, txbytes, txerrs: Int?
let txretries, txidle, reqTimeouts, rx: Int?
let rxbytes, rxerrs, rxcorriderrs, rxpartial: Int?
let rxidle, zbufGrow, bufGrow, wakeups: Int?
let connects, disconnects: Int?
let intLatency, outbufLatency, rtt, throttle: [String: Int]?
let req: [String: Int]?
let toppars: [String: Toppar]?

enum CodingKeys: String, CodingKey {
case name, nodeid, nodename, source, state, stateage
case outbufCnt = "outbuf_cnt"
case outbufMsgCnt = "outbuf_msg_cnt"
case waitrespCnt = "waitresp_cnt"
case waitrespMsgCnt = "waitresp_msg_cnt"
case tx, txbytes, txerrs, txretries, txidle
case reqTimeouts = "req_timeouts"
case rx, rxbytes, rxerrs, rxcorriderrs, rxpartial, rxidle
case zbufGrow = "zbuf_grow"
case bufGrow = "buf_grow"
case wakeups, connects, disconnects
case intLatency = "int_latency"
case outbufLatency = "outbuf_latency"
case rtt, throttle, req, toppars
}
}

// MARK: - Toppars

struct Toppar: Hashable, Codable {
let topic: String?
let partition: Int?

enum CodingKeys: String, CodingKey {
case topic, partition
}
}

// MARK: - Cgrp

struct Cgrp: Hashable, Codable {
let state: String?
let stateage: Int?
let joinState: String?
let rebalanceAge, rebalanceCnt: Int?
let rebalanceReason: String?
let assignmentSize: Int?

enum CodingKeys: String, CodingKey {
case state, stateage
case joinState = "join_state"
case rebalanceAge = "rebalance_age"
case rebalanceCnt = "rebalance_cnt"
case rebalanceReason = "rebalance_reason"
case assignmentSize = "assignment_size"
}
}

// MARK: - Topic

struct Topic: Hashable, Codable {
let topic: String?
let age, metadataAge: Int?
let batchsize, batchcnt: [String: Int]?
let partitions: [String: Partition]?

enum CodingKeys: String, CodingKey {
case topic, age
case metadataAge = "metadata_age"
case batchsize, batchcnt, partitions
}
}

// MARK: - Partition

struct Partition: Hashable, Codable {
let partition, broker, leader: Int?
let desired, unknown: Bool?
let msgqCnt, msgqBytes, xmitMsgqCnt, xmitMsgqBytes: Int?
let fetchqCnt, fetchqSize: Int?
let fetchState: String?
let queryOffset, nextOffset, appOffset, storedOffset: Int?
let commitedOffset, committedOffset, eofOffset, loOffset: Int?
let hiOffset, lsOffset, consumerLag, consumerLagStored: Int?
let txmsgs, txbytes, rxmsgs, rxbytes: Int?
let msgs, rxVerDrops, msgsInflight, nextACKSeq: Int?
let nextErrSeq, ackedMsgid: Int?

enum CodingKeys: String, CodingKey {
case partition, broker, leader, desired, unknown
case msgqCnt = "msgq_cnt"
case msgqBytes = "msgq_bytes"
case xmitMsgqCnt = "xmit_msgq_cnt"
case xmitMsgqBytes = "xmit_msgq_bytes"
case fetchqCnt = "fetchq_cnt"
case fetchqSize = "fetchq_size"
case fetchState = "fetch_state"
case queryOffset = "query_offset"
case nextOffset = "next_offset"
case appOffset = "app_offset"
case storedOffset = "stored_offset"
case commitedOffset = "commited_offset"
case committedOffset = "committed_offset"
case eofOffset = "eof_offset"
case loOffset = "lo_offset"
case hiOffset = "hi_offset"
case lsOffset = "ls_offset"
case consumerLag = "consumer_lag"
case consumerLagStored = "consumer_lag_stored"
case txmsgs, txbytes, rxmsgs, rxbytes, msgs
case rxVerDrops = "rx_ver_drops"
case msgsInflight = "msgs_inflight"
case nextACKSeq = "next_ack_seq"
case nextErrSeq = "next_err_seq"
case ackedMsgid = "acked_msgid"
}
}
Loading

0 comments on commit c870864

Please sign in to comment.