Skip to content

Commit

Permalink
Fix errors after rebase
Browse files Browse the repository at this point in the history
Modifications:

* move `NoBackPressure` struct to `extension` of
  `NIOAsyncSequenceProducerBackPressureStrategies`
* break down duplicate `ShutDownOnTerminate` type into two more
  specialised types for `KafkaConsumer` and `KafkaProducer`
* add missing `config` parameter to `KafkaProducer`'s initialiser
  • Loading branch information
felixschlegel committed Jun 29, 2023
1 parent 69ca486 commit 34f2d72
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 34 deletions.
25 changes: 9 additions & 16 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,15 @@ import Logging
import NIOConcurrencyHelpers
import NIOCore

// MARK: - NoBackPressure

/// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true.
struct NoBackPressure: NIOAsyncSequenceProducerBackPressureStrategy {
func didYield(bufferDepth: Int) -> Bool { true }
func didConsume(bufferDepth: Int) -> Bool { true }
}

// MARK: - ShutDownOnTerminate
// MARK: - KafkaConsumerShutDownOnTerminate

/// `NIOAsyncSequenceProducerDelegate` that terminates the shuts the consumer down when
/// `didTerminate()` is invoked.
struct ShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock
internal struct KafkaConsumerShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock
let stateMachine: NIOLockedValueBox<KafkaConsumer.StateMachine>
}

extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
extension KafkaConsumerShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
func produceMore() {
// No back pressure
return
Expand Down Expand Up @@ -68,7 +60,8 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
/// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
public struct KafkaConsumerMessages: AsyncSequence {
public typealias Element = KafkaConsumerMessage
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, NoBackPressure, ShutdownOnTerminate>
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, BackPressureStrategy, KafkaConsumerShutdownOnTerminate>
let wrappedSequence: WrappedSequence

/// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
Expand All @@ -91,8 +84,8 @@ public struct KafkaConsumerMessages: AsyncSequence {
public final class KafkaConsumer {
typealias Producer = NIOAsyncSequenceProducer<
KafkaConsumerMessage,
NoBackPressure,
ShutdownOnTerminate
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
KafkaConsumerShutdownOnTerminate
>
/// The configuration object of the consumer client.
private var config: KafkaConsumerConfiguration
Expand Down Expand Up @@ -123,8 +116,8 @@ public final class KafkaConsumer {

let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
elementType: KafkaConsumerMessage.self,
backPressureStrategy: NoBackPressure(),
delegate: ShutdownOnTerminate(stateMachine: self.stateMachine)
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
delegate: KafkaConsumerShutdownOnTerminate(stateMachine: self.stateMachine)
)

self.messages = KafkaConsumerMessages(
Expand Down
36 changes: 19 additions & 17 deletions Sources/SwiftKafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,15 @@ import Logging
import NIOConcurrencyHelpers
import NIOCore

/// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true.
struct NoBackPressure: NIOAsyncSequenceProducerBackPressureStrategy {
func didYield(bufferDepth: Int) -> Bool { true }
func didConsume(bufferDepth: Int) -> Bool { true }
}

// MARK: - ShutDownOnTerminate
// MARK: - KafkaProducerShutdownOnTerminate

/// `NIOAsyncSequenceProducerDelegate` that terminates the shuts the producer down when
/// `didTerminate()` is invoked.
struct ShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock
internal struct KafkaProducerShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock
let stateMachine: NIOLockedValueBox<KafkaProducer.StateMachine>
}

extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
extension KafkaProducerShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
func produceMore() {
// No back pressure
return
Expand All @@ -55,10 +49,13 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
}
}

// MARK: - KafkaMessageAcknowledgements

/// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
public struct KafkaMessageAcknowledgements: AsyncSequence {
public typealias Element = Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, NoBackPressure, ShutdownOnTerminate>
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, BackPressureStrategy, KafkaProducerShutdownOnTerminate>
let wrappedSequence: WrappedSequence

/// `AsynceIteratorProtocol` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
Expand All @@ -82,29 +79,32 @@ public struct KafkaMessageAcknowledgements: AsyncSequence {
public final class KafkaProducer {
typealias Producer = NIOAsyncSequenceProducer<
Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>,
NoBackPressure,
ShutdownOnTerminate
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
KafkaProducerShutdownOnTerminate
>

/// State of the ``KafkaProducer``.
private let stateMachine: NIOLockedValueBox<StateMachine>

/// The configuration object of the producer client.
private let config: KafkaProducerConfiguration
/// Topic configuration that is used when a new topic has to be created by the producer.
private let topicConfig: KafkaTopicConfiguration

// Private initializer, use factory methods to create KafkaProducer
/// Initialize a new ``KafkaProducer``.
///
/// - Parameter client: The ``KafkaClient`` instance associated with the ``KafkaProducer``.
/// - Parameter stateMachine: The ``KafkaProducer/StateMachine`` instance associated with the ``KafkaProducer``.///
/// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
private init(
stateMachine: NIOLockedValueBox<KafkaProducer.StateMachine>,
config: KafkaProducerConfiguration,
topicConfig: KafkaTopicConfiguration
) throws {
self.stateMachine = stateMachine
self.config = config
self.topicConfig = topicConfig
}

Expand Down Expand Up @@ -135,6 +135,7 @@ public final class KafkaProducer {

let producer = try KafkaProducer(
stateMachine: stateMachine,
config: config,
topicConfig: topicConfig
)

Expand Down Expand Up @@ -169,8 +170,8 @@ public final class KafkaProducer {

let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
elementType: Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>.self,
backPressureStrategy: NoBackPressure(),
delegate: ShutdownOnTerminate(stateMachine: stateMachine)
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
delegate: KafkaProducerShutdownOnTerminate(stateMachine: stateMachine)
)
let source = sourceAndSequence.source

Expand All @@ -191,6 +192,7 @@ public final class KafkaProducer {

let producer = try KafkaProducer(
stateMachine: stateMachine,
config: config,
topicConfig: topicConfig
)

Expand Down Expand Up @@ -405,7 +407,7 @@ extension KafkaProducer {
}

/// Returns the next action to be taken when wanting to poll.
/// - Returns: The next action to be taken, either polling or killing the poll loop.
/// - Returns: The next action to be taken, either polling or terminating the poll loop.
///
/// - Important: This function throws a `fatalError` if called while in the `.initializing` state.
func nextPollLoopAction() -> PollLoopAction {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-gsoc open source project
//
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore

extension NIOAsyncSequenceProducerBackPressureStrategies {
/// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true.
struct NoBackPressure: NIOAsyncSequenceProducerBackPressureStrategy {
func didYield(bufferDepth: Int) -> Bool { true }
func didConsume(bufferDepth: Int) -> Bool { true }
}
}
2 changes: 1 addition & 1 deletion Tests/SwiftKafkaTests/KafkaProducerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ final class KafkaProducerTests: XCTestCase {
} catch {}

// This subscribes to the acknowledgements stream and immediately terminates the stream.
// Required to kill the run task.
// Required to terminate the run task.
var iterator: KafkaMessageAcknowledgements.AsyncIterator? = acks.makeAsyncIterator()
_ = iterator
iterator = nil
Expand Down

0 comments on commit 34f2d72

Please sign in to comment.