Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feature/sc-1976/gsoc-…
Browse files Browse the repository at this point in the history
…expose-librdkafka-statistics
  • Loading branch information
blindspotbounty committed Jul 24, 2023
2 parents cac1442 + 5b07fe2 commit 2f2d828
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 135 deletions.
11 changes: 11 additions & 0 deletions Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ public struct KafkaProducerConfiguration {
/// >= 1ms - statistics provided every specified interval
public var statisticsInterval: Duration = .zero

/// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
/// Default: `10000`
public var flushTimeoutMilliseconds: Int = 10000 {
didSet {
precondition(
0...Int(Int32.max) ~= self.flushTimeoutMilliseconds,
"Flush timeout outside of valid range \(0...Int32.max)"
)
}
}

// MARK: - Producer-specific Config Properties

/// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
Expand Down
29 changes: 14 additions & 15 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
//
//===----------------------------------------------------------------------===//

import Crdkafka
import Logging
import NIOConcurrencyHelpers
import NIOCore
Expand Down Expand Up @@ -128,7 +127,7 @@ public final class KafkaConsumer: Sendable, Service {
self.config = config
self.logger = logger

let client = try RDKafka.createClient(
let client = try RDKafkaClient.makeClient(
type: .consumer,
configDictionary: config.dictionary,
events: [.log, .fetch, .offsetCommit],
Expand Down Expand Up @@ -164,8 +163,8 @@ public final class KafkaConsumer: Sendable, Service {
)
}

// Events that would be triggered by ``KafkaClient/poll(timeout:)``
// are now triggered by ``KafkaClient/consumerPoll``.
// Events that would be triggered by ``RDKafkaClient/poll(timeout:)``
// are now triggered by ``RDKafkaClient/consumerPoll``.
try client.pollSetConsumer()

switch config.consumptionStrategy._internal {
Expand Down Expand Up @@ -302,7 +301,7 @@ public final class KafkaConsumer: Sendable, Service {
}

private func _triggerGracefulShutdown(
client: KafkaClient,
client: RDKafkaClient,
logger: Logger
) {
do {
Expand Down Expand Up @@ -336,7 +335,7 @@ extension KafkaConsumer {
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
case initializing(
client: KafkaClient,
client: RDKafkaClient,
source: Producer.Source,
statisticsSource: StatisticsProducer.Source?
)
Expand All @@ -346,15 +345,15 @@ extension KafkaConsumer {
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
/// - Parameter statisticsSource: ``NIOAsyncSequenceProducer/Source`` used for yielding new statistics elements.
case consuming(
client: KafkaClient,
client: RDKafkaClient,
source: Producer.Source?,
statisticsSource: StatisticsProducer.Source?
)
/// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked.
/// We are now in the process of commiting our last state to the broker.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case finishing(client: KafkaClient)
case finishing(client: RDKafkaClient)
/// The ``KafkaConsumer`` is closed.
case finished
}
Expand All @@ -365,7 +364,7 @@ extension KafkaConsumer {
/// Delayed initialization of `StateMachine` as the `source` and the `pollClosure` are
/// not yet available when the normal initialization occurs.
mutating func initialize(
client: KafkaClient,
client: RDKafkaClient,
source: Producer.Source,
statisticsSource: StatisticsProducer.Source?
) {
Expand All @@ -387,7 +386,7 @@ extension KafkaConsumer {
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
/// - Parameter statisticsSource: ``NIOAsyncSequenceProducer/Source`` used for yielding new statistics elements.
case pollForAndYieldMessage(
client: KafkaClient,
client: RDKafkaClient,
source: Producer.Source?,
statisticsSource: StatisticsProducer.Source?
)
Expand Down Expand Up @@ -423,7 +422,7 @@ extension KafkaConsumer {
enum SetUpConnectionAction {
/// Set up the connection through ``subscribe()`` or ``assign()``.
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case setUpConnection(client: KafkaClient)
case setUpConnection(client: RDKafkaClient)
}

/// Get action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``.
Expand Down Expand Up @@ -482,7 +481,7 @@ extension KafkaConsumer {
enum StoreOffsetAction {
/// Store the message offset with the given `client`.
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case storeOffset(client: KafkaClient)
case storeOffset(client: RDKafkaClient)
}

/// Get action to take when wanting to store a message offset (to be auto-committed by `librdkafka`).
Expand All @@ -508,7 +507,7 @@ extension KafkaConsumer {
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case commitSync(
client: KafkaClient
client: RDKafkaClient
)
/// Throw an error. The ``KafkaConsumer`` is closed.
case throwClosedError
Expand Down Expand Up @@ -540,14 +539,14 @@ extension KafkaConsumer {
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case triggerGracefulShutdown(
client: KafkaClient
client: RDKafkaClient
)
/// Shut down the ``KafkaConsumer`` and finish the given `source` object.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
case triggerGracefulShutdownAndFinishSource(
client: KafkaClient,
client: RDKafkaClient,
source: Producer.Source,
statisticsSource: StatisticsProducer.Source?
)
Expand Down
56 changes: 27 additions & 29 deletions Sources/SwiftKafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
//
//===----------------------------------------------------------------------===//

import Crdkafka
import Logging
import NIOConcurrencyHelpers
import NIOCore
Expand Down Expand Up @@ -136,7 +135,7 @@ public final class KafkaProducer: Service, Sendable {
}
}

let client = try RDKafka.createClient(
let client = try RDKafkaClient.makeClient(
type: .producer,
configDictionary: config.dictionary,
events: [.log], // No .deliveryReport here!
Expand Down Expand Up @@ -189,7 +188,7 @@ public final class KafkaProducer: Service, Sendable {
)
let source = sourceAndSequence.source

let client = try RDKafka.createClient(
let client = try RDKafkaClient.makeClient(
type: .producer,
configDictionary: config.dictionary,
events: [.log, .deliveryReport],
Expand Down Expand Up @@ -268,9 +267,15 @@ public final class KafkaProducer: Service, Sendable {
if !flushing || events.isEmpty {
try await Task.sleep(for: self.config.pollInterval)
}
case .terminatePollLoopAndFinishSource(let source, let statisticsSource):
case .flushFinishSourceAndTerminatePollLoop(let client, let source, let statisticsSource):
precondition(
0...Int(Int32.max) ~= self.config.flushTimeoutMilliseconds,
"Flush timeout outside of valid range \(0...Int32.max)"
)
try await client.flush(timeoutMilliseconds: Int32(self.config.flushTimeoutMilliseconds))
source?.finish()
statisticsSource?.finish()
statisticsSource?.finish()
return
case .terminatePollLoop:
return
Expand Down Expand Up @@ -329,7 +334,7 @@ extension KafkaProducer {
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
/// - Parameter topicHandles: Class containing all topic names with their respective `rd_kafka_topic_t` pointer.
case started(
client: KafkaClient,
client: RDKafkaClient,
messageIDCounter: UInt,
source: Producer.Source?,
statisticsSource: StatisticsProducer.Source?,
Expand All @@ -340,8 +345,8 @@ extension KafkaProducer {
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
case flushing(
client: KafkaClient,
case finishing(
client: RDKafkaClient,
source: Producer.Source?,
statisticsSource: StatisticsProducer.Source?
)
Expand All @@ -355,7 +360,7 @@ extension KafkaProducer {
/// Delayed initialization of `StateMachine` as the `source` is not yet available
/// when the normal initialization occurs.
mutating func initialize(
client: KafkaClient,
client: RDKafkaClient,
source: Producer.Source?,
statisticsSource: StatisticsProducer.Source?
) {
Expand All @@ -377,11 +382,12 @@ extension KafkaProducer {
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
case pollAndYield(client: KafkaClient, source: Producer.Source?, statisticsSource: StatisticsProducer.Source?, flushing: Bool = false)
case pollAndYield(client: RDKafkaClient, source: Producer.Source?, statisticsSource: StatisticsProducer.Source?, flushing: Bool = false)
/// Terminate the poll loop and finish the given `NIOAsyncSequenceProducerSource`.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
case terminatePollLoopAndFinishSource(source: Producer.Source?, statisticsSource: StatisticsProducer.Source?)
case flushFinishSourceAndTerminatePollLoop(client: RDKafkaClient, source: Producer.Source?, statisticsSource: StatisticsProducer.Source?)
/// Terminate the poll loop.
case terminatePollLoop
}
Expand All @@ -396,13 +402,8 @@ extension KafkaProducer {
fatalError("\(#function) invoked while still in state \(self.state)")
case .started(let client, _, let source, let statisticsSource, _):
return .pollAndYield(client: client, source: source, statisticsSource: statisticsSource)
case .flushing(let client, let source, let statisticsSource):
if client.outgoingQueueSize > 0 {
return .pollAndYield(client: client, source: source, statisticsSource: statisticsSource, flushing: true)
} else {
self.state = .finished
return .terminatePollLoopAndFinishSource(source: source, statisticsSource: statisticsSource)
}
case .finishing(let client, let source, let statisticsSource):
return .flushFinishSourceAndTerminatePollLoop(client: client, source: source, statisticsSource: statisticsSource)
case .finished:
return .terminatePollLoop
}
Expand All @@ -414,7 +415,7 @@ extension KafkaProducer {
///
/// - Important: `newMessageID` is the new message ID assigned to the message to be sent.
case send(
client: KafkaClient,
client: RDKafkaClient,
newMessageID: UInt,
topicHandles: RDKafkaTopicHandles
)
Expand Down Expand Up @@ -444,8 +445,8 @@ extension KafkaProducer {
newMessageID: newMessageID,
topicHandles: topicHandles
)
case .flushing:
throw KafkaError.connectionClosed(reason: "Producer in the process of flushing and shutting down")
case .finishing:
throw KafkaError.connectionClosed(reason: "Producer in the process of finishing")
case .finished:
throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer")
}
Expand Down Expand Up @@ -479,9 +480,9 @@ extension KafkaProducer {
}
self.state = .started(client: client, messageIDCounter: counter, source: nil, statisticsSource: statisticsSource, topicHandles: topicHandlers)
return .finishSource(source: source)
case .flushing(let client, let source, let statisticsSource):
case .finishing(let client, let source, let statisticsSource):
// Setting source to nil to prevent incoming acknowledgements from buffering in `source`
self.state = .flushing(client: client, source: nil, statisticsSource: statisticsSource)
self.state = .finishing(client: client, source: nil, statisticsSource: statisticsSource)
return .finishSource(source: source)
case .finished:
break
Expand All @@ -498,13 +499,10 @@ extension KafkaProducer {
fatalError("stopStatistics() must not be invoked more than once")
}
self.state = .started(client: client, messageIDCounter: counter, source: source, statisticsSource: nil, topicHandles: topicHandlers)
client.withKafkaHandlePointer { kafkaHandle in
rd_kafka_conf_set_stats_cb(<#T##conf: OpaquePointer!##OpaquePointer!#>, <#T##stats_cb: ((OpaquePointer?, UnsafeMutablePointer<CChar>?, Int, UnsafeMutableRawPointer?) -> Int32)!##((OpaquePointer?, UnsafeMutablePointer<CChar>?, Int, UnsafeMutableRawPointer?) -> Int32)!##(OpaquePointer?, UnsafeMutablePointer<CChar>?, Int, UnsafeMutableRawPointer?) -> Int32#>)
}
return .finishStatisticsSource(statisticsSource: statisticsSource)
case .flushing(let client, let source, let statisticsSource):
case .finishing(let client, let source, let statisticsSource):
// Setting source to nil to prevent incoming acknowledgements from buffering in `source`
self.state = .flushing(client: client, source: source, statisticsSource: nil)
self.state = .finishing(client: client, source: source, statisticsSource: nil)
return .finishStatisticsSource(statisticsSource: statisticsSource)
case .finished:
break
Expand All @@ -520,8 +518,8 @@ extension KafkaProducer {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .started(let client, _, let source, let statisticsSource, _):
self.state = .flushing(client: client, source: source, statisticsSource: statisticsSource)
case .flushing, .finished:
self.state = .finishing(client: client, source: source, statisticsSource: statisticsSource)
case .finishing, .finished:
break
}
}
Expand Down
63 changes: 0 additions & 63 deletions Sources/SwiftKafka/RDKafka/RDKafka.swift

This file was deleted.

Loading

0 comments on commit 2f2d828

Please sign in to comment.