Skip to content

Commit

Permalink
Refactor KafkaProducer
Browse files Browse the repository at this point in the history
Motiviation:

* align `KafkaProducer` more with proposed changes to `KafkaConsumer`
* `AsyncStream` was not handling `AsyncSequence` termination handling as
  we wanted it to, so revert back to use `NIOAsyncSequenceProducer`

Modifications:

* make `KafkaProducer` `final class` instead of `actor`
* `KafkaProducer`: use `NIOAsyncSequenceProducer` instead of
  `AsyncSequence` for better termination handling -> shutdown
  `KafkaProducer` on termination of the `AsyncSequence`
* introduce `StateMachine` to `KafkaProducer`
* move internal state of `KafkaProducer` to `KafkaProducer.StateMachine`
* remove unused `await` expressions when accessing `KafkaProducer`
* update tests
* update `README`
  • Loading branch information
felixschlegel committed Jun 22, 2023
1 parent 1173b07 commit d6805a7
Show file tree
Hide file tree
Showing 4 changed files with 364 additions and 110 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The `sendAsync(_:)` method of `KafkaProducer` returns a message-id that can late
```swift
let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"])

let (producer, acknowledgements) = try await KafkaProducer.makeProducerWithAcknowledgements(
let (producer, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements(
config: config,
logger: .kafkaTest // Your logger here
)
Expand All @@ -25,7 +25,7 @@ await withThrowingTaskGroup(of: Void.self) { group in

// Task receiving acknowledgements
group.addTask {
let messageID = try await producer.sendAsync(
let messageID = try producer.sendAsync(
KafkaProducerMessage(
topic: "topic-name",
value: "Hello, World!"
Expand Down
Loading

0 comments on commit d6805a7

Please sign in to comment.