Skip to content

Commit

Permalink
KafkaProducer: events- instead of acks sequence (#96)
Browse files Browse the repository at this point in the history
* `KafkaProducer`: event instead of acks sequence

Motivation:

`KafkaProducer`: we want to expose a general `KafkaProducerEvent` type
in the `AsyncSequence` that was formerly just for message
acknowledgements. This enables us to add more events such as statistics
in the future.

The reason behind having a single `AsyncSequence` for all event types
instead of having separate `AsyncSequence`s for each event type is that
we need to ensure that all events that we subscribe to in `librdkafka`
are actually consumed by the `AsyncSequence` in our `KafkaProducer`
type. Otherwise we could run out of memory.

Now by giving the user the entire events `AsyncSequence`, they decide if
they want to consume event or drop it.

> **Note**: Logs will be consumed regardless of the
> `KafkaProducerEvents` `AsyncSequence`

Modifications:

* create a new `enum` `KafkaProducerEvent`
* rename `KafkaMessageAcknowledgements` -> `KafkaProducerEvents`
* rename `KafkaProducer.makeProducerWithAcknowledgements` ->
  `.makeProducerWithEvents`
* update tests
* update README

* Review Franz

Modifications:

* fix documentation typos
* `KafkaProducerEvent`: replace factory method with `init`
* create new type `KafkaProducerMessageStatus` effectively representing
  if a `KafkaProducerMessage` was acknowledged or not
* fix README nit

* `KafkaDeliveryReport` type

Modifications:

* create new type `KafkaDeliveryReport` containing a message's status
  and its id
* remove ID from `KafkaAcknowledgedMessage`
* remove `KafkaAcknowledgedMessageError`
* rename `KafkaProducerEvent.deliverReport` -> `.deliverReports`
* update README
* update tests

* Fix documentation nits

* Review Gus

Modifications:

* fix typos
* rename `KafkaProducerEvents.KafkaProducerEventAsyncIterator` ->
  `KafkaProducerEvents.AsyncIterator`

* Fix typo

* Fix (letter) case nit

Co-authored-by: Franz Busch <[email protected]>

---------

Co-authored-by: Franz Busch <[email protected]>
  • Loading branch information
felixschlegel and FranzBusch authored Jul 26, 2023
1 parent 5b07fe2 commit f8cb0a0
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 217 deletions.
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ Both the `KafkaProducer` and the `KafkaConsumer` implement the [`Service`](https

### Producer API

The `send(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `acknowledgements` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error.
The `send(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `events` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error.

```swift
let broker = KafkaConfiguration.Broker(host: "localhost", port: 9092)
var config = KafkaProducerConfiguration()
config.bootstrapServers = [broker]

let (producer, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements(
let (producer, events) = try KafkaProducer.makeProducerWithEvents(
config: config,
logger: logger
)
Expand All @@ -53,7 +53,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
try await serviceGroup.run()
}

// Task receiving acknowledgements
// Task sending message and receiving events
group.addTask {
let messageID = try producer.send(
KafkaProducerMessage(
Expand All @@ -62,8 +62,13 @@ await withThrowingTaskGroup(of: Void.self) { group in
)
)

for await acknowledgement in acknowledgements {
// Check if acknowledgement belongs to the sent message
for await event in events {
switch event {
case .deliveryReports(let deliveryReports):
// Check what messages the delivery reports belong to
default:
break // Ignore any other events
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public struct KafkaProducerConfiguration {
// MARK: - SwiftKafka-specific Config properties

/// The time between two consecutive polls.
/// Effectively controls the rate at which incoming events and acknowledgments are consumed.
/// Effectively controls the rate at which incoming events are consumed.
/// Default: `.milliseconds(100)`
public var pollInterval: Duration = .milliseconds(100)

Expand Down
10 changes: 2 additions & 8 deletions Sources/SwiftKafka/KafkaAcknowledgedMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ import NIOCore

/// A message produced by the client and acknowledged by the Kafka cluster.
public struct KafkaAcknowledgedMessage {
/// The unique identifier assigned by the ``KafkaProducer`` when the message was send to Kafka.
/// The same identifier is returned by ``KafkaProducer/send(_:)`` and can be used to correlate
/// a sent message and an acknowledged message.
public var id: KafkaProducerMessageID
/// The topic that the message was sent to.
public var topic: String
/// The partition that the message was sent to.
Expand All @@ -34,16 +30,14 @@ public struct KafkaAcknowledgedMessage {

/// Initialize ``KafkaAcknowledgedMessage`` from `rd_kafka_message_t` pointer.
/// - Throws: A ``KafkaAcknowledgedMessageError`` for failed acknowledgements or malformed messages.
init(messagePointer: UnsafePointer<rd_kafka_message_t>, id: KafkaProducerMessageID) throws {
self.id = id

init(messagePointer: UnsafePointer<rd_kafka_message_t>) throws {
let rdKafkaMessage = messagePointer.pointee

let valueBufferPointer = UnsafeRawBufferPointer(start: rdKafkaMessage.payload, count: rdKafkaMessage.len)
self.value = ByteBuffer(bytes: valueBufferPointer)

guard rdKafkaMessage.err == RD_KAFKA_RESP_ERR_NO_ERROR else {
throw KafkaAcknowledgedMessageError.fromRDKafkaError(messageID: self.id, error: rdKafkaMessage.err)
throw KafkaError.rdKafkaError(wrapping: rdKafkaMessage.err)
}

guard let topic = String(validatingUTF8: rd_kafka_topic_name(rdKafkaMessage.rkt)) else {
Expand Down
79 changes: 0 additions & 79 deletions Sources/SwiftKafka/KafkaAcknowledgedMessageError.swift

This file was deleted.

2 changes: 1 addition & 1 deletion Sources/SwiftKafka/KafkaConsumerMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public struct KafkaConsumerMessage {
let rdKafkaMessage = messagePointer.pointee

guard let valuePointer = rdKafkaMessage.payload else {
fatalError("Could not resolve payload of acknowledged message")
fatalError("Could not resolve payload of consumer message")
}

let valueBufferPointer = UnsafeRawBufferPointer(start: valuePointer, count: rdKafkaMessage.len)
Expand Down
51 changes: 51 additions & 0 deletions Sources/SwiftKafka/KafkaDeliveryReport.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-gsoc open source project
//
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Crdkafka

/// A delivery report for a message that was sent to the Kafka cluster.
public struct KafkaDeliveryReport: Sendable, Hashable {
public enum Status: Sendable, Hashable {
/// The message has been successfully acknowledged by the Kafka cluster.
case acknowledged(message: KafkaAcknowledgedMessage)
/// The message failed to be acknowledged by the Kafka cluster and encountered an error.
case failure(KafkaError)
}

/// The status of a Kafka producer message after attempting to send it.
public var status: Status

/// The unique identifier assigned by the ``KafkaProducer`` when the message was sent to Kafka.
/// The same identifier is returned by ``KafkaProducer/send(_:)`` and can be used to correlate
/// a sent message with a delivery report.
public var id: KafkaProducerMessageID

internal init?(messagePointer: UnsafePointer<rd_kafka_message_t>?) {
guard let messagePointer else {
return nil
}

self.id = KafkaProducerMessageID(rawValue: UInt(bitPattern: messagePointer.pointee._private))

do {
let message = try KafkaAcknowledgedMessage(messagePointer: messagePointer)
self.status = .acknowledged(message: message)
} catch {
guard let error = error as? KafkaError else {
fatalError("Caught error that is not of type \(KafkaError.self)")
}
self.status = .failure(error)
}
}
}
68 changes: 32 additions & 36 deletions Sources/SwiftKafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,26 @@ extension KafkaProducerCloseOnTerminate: NIOAsyncSequenceProducerDelegate {
}
}

// MARK: - KafkaMessageAcknowledgements
// MARK: - KafkaProducerEvents

/// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
public struct KafkaMessageAcknowledgements: AsyncSequence {
public typealias Element = Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
/// `AsyncSequence` implementation for handling ``KafkaProducerEvent``s emitted by Kafka.
public struct KafkaProducerEvents: AsyncSequence {
public typealias Element = KafkaProducerEvent
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, BackPressureStrategy, KafkaProducerCloseOnTerminate>
let wrappedSequence: WrappedSequence

/// `AsynceIteratorProtocol` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
public struct AcknowledgedMessagesAsyncIterator: AsyncIteratorProtocol {
/// `AsynceIteratorProtocol` implementation for handling ``KafkaProducerEvent``s emitted by Kafka.
public struct AsyncIterator: AsyncIteratorProtocol {
var wrappedIterator: WrappedSequence.AsyncIterator

public mutating func next() async -> Element? {
await self.wrappedIterator.next()
}
}

public func makeAsyncIterator() -> AcknowledgedMessagesAsyncIterator {
return AcknowledgedMessagesAsyncIterator(wrappedIterator: self.wrappedSequence.makeAsyncIterator())
public func makeAsyncIterator() -> AsyncIterator {
return AsyncIterator(wrappedIterator: self.wrappedSequence.makeAsyncIterator())
}
}

Expand All @@ -72,7 +72,7 @@ public struct KafkaMessageAcknowledgements: AsyncSequence {
/// configuration object (only works if server has `auto.create.topics.enable` property set).
public final class KafkaProducer: Service, Sendable {
typealias Producer = NIOAsyncSequenceProducer<
Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>,
KafkaProducerEvent,
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
KafkaProducerCloseOnTerminate
>
Expand Down Expand Up @@ -104,7 +104,7 @@ public final class KafkaProducer: Service, Sendable {

/// Initialize a new ``KafkaProducer``.
///
/// This factory method creates a producer without message acknowledgements.
/// This factory method creates a producer without listening for events.
///
/// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
Expand Down Expand Up @@ -141,28 +141,28 @@ public final class KafkaProducer: Service, Sendable {
return producer
}

/// Initialize a new ``KafkaProducer`` and a ``KafkaMessageAcknowledgements`` asynchronous sequence.
/// Initialize a new ``KafkaProducer`` and a ``KafkaProducerEvents`` asynchronous sequence.
///
/// Use the asynchronous sequence to consume message acknowledgements.
/// Use the asynchronous sequence to consume events.
///
/// - Important: When the asynchronous sequence is deinited the producer will be shutdown and disallow sending more messages.
/// Additionally, make sure to consume the asynchronous sequence otherwise the acknowledgements will be buffered in memory indefinitely.
/// Additionally, make sure to consume the asynchronous sequence otherwise the events will be buffered in memory indefinitely.
///
/// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements``
/// `AsyncSequence` used for receiving message acknowledgements.
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaProducerEvents``
/// `AsyncSequence` used for receiving message events.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
public static func makeProducerWithAcknowledgements(
public static func makeProducerWithEvents(
config: KafkaProducerConfiguration = KafkaProducerConfiguration(),
topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
logger: Logger
) throws -> (KafkaProducer, KafkaMessageAcknowledgements) {
) throws -> (KafkaProducer, KafkaProducerEvents) {
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))

let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
elementType: Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>.self,
elementType: KafkaProducerEvent.self,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
delegate: KafkaProducerCloseOnTerminate(stateMachine: stateMachine)
)
Expand All @@ -188,11 +188,11 @@ public final class KafkaProducer: Service, Sendable {
)
}

let acknowlegementsSequence = KafkaMessageAcknowledgements(wrappedSequence: sourceAndSequence.sequence)
return (producer, acknowlegementsSequence)
let eventsSequence = KafkaProducerEvents(wrappedSequence: sourceAndSequence.sequence)
return (producer, eventsSequence)
}

/// Start polling Kafka for acknowledged messages.
/// Start polling Kafka for events.
///
/// - Returns: An awaitable task representing the execution of the poll loop.
public func run() async throws {
Expand All @@ -208,18 +208,14 @@ public final class KafkaProducer: Service, Sendable {
let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() }
switch nextAction {
case .pollWithoutYield(let client):
// Drop any incoming acknowledgments
// Drop any incoming events
let _ = client.eventPoll()
case .pollAndYield(let client, let source):
let events = client.eventPoll()
for event in events {
switch event {
case .deliveryReport(let results):
// Ignore YieldResult as we don't support back pressure in KafkaProducer
results.forEach { _ = source?.yield($0) }
default:
break // Ignore
}
let producerEvent = KafkaProducerEvent(event)
// Ignore YieldResult as we don't support back pressure in KafkaProducer
_ = source?.yield(producerEvent)
}
try await Task.sleep(for: self.config.pollInterval)
case .flushFinishSourceAndTerminatePollLoop(let client, let source):
Expand Down Expand Up @@ -292,8 +288,8 @@ extension KafkaProducer {
source: Producer.Source?,
topicHandles: RDKafkaTopicHandles
)
/// Producer is still running but the acknowledgement asynchronous sequence was terminated.
/// All incoming acknowledgements will be dropped.
/// Producer is still running but the event asynchronous sequence was terminated.
/// All incoming events will be dropped.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case consumptionStopped(client: RDKafkaClient)
Expand Down Expand Up @@ -336,7 +332,7 @@ extension KafkaProducer {
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case pollWithoutYield(client: RDKafkaClient)
/// Poll client and yield acknowledgments if any received.
/// Poll client and yield events if any received.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
Expand Down Expand Up @@ -403,7 +399,7 @@ extension KafkaProducer {
topicHandles: topicHandles
)
case .consumptionStopped:
throw KafkaError.connectionClosed(reason: "Sequence consuming acknowledgements was abruptly terminated, producer closed")
throw KafkaError.connectionClosed(reason: "Sequence consuming events was abruptly terminated, producer closed")
case .finishing:
throw KafkaError.connectionClosed(reason: "Producer in the process of finishing")
case .finished:
Expand All @@ -419,8 +415,8 @@ extension KafkaProducer {
case finishSource(source: Producer.Source?)
}

/// The acknowledgements asynchronous sequence was terminated.
/// All incoming acknowledgements will be dropped.
/// The events asynchronous sequence was terminated.
/// All incoming events will be dropped.
mutating func stopConsuming() -> StopConsumingAction? {
switch self.state {
case .uninitialized:
Expand All @@ -431,7 +427,7 @@ extension KafkaProducer {
self.state = .consumptionStopped(client: client)
return .finishSource(source: source)
case .finishing(let client, let source):
// Setting source to nil to prevent incoming acknowledgements from buffering in `source`
// Setting source to nil to prevent incoming events from buffering in `source`
self.state = .finishing(client: client, source: nil)
return .finishSource(source: source)
case .finished:
Expand Down
Loading

0 comments on commit f8cb0a0

Please sign in to comment.