Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

README: improve KafkaProducer examples #76

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,46 @@ Finally, add `import SwiftKafka` to your source code.

### Producer API

After creating the `KafkaProducer`, messages can be sent to a `topic` using the `send(_:)` method.

```swift
let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"])

let producer = try KafkaProducer.makeProducer(
config: config,
logger: .kafkaTest // Your logger here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We normally just do this

Suggested change
logger: .kafkaTest // Your logger here
logger: logger

)

await withThrowingTaskGroup(of: Void.self) { group in

// Run Task
group.addTask {
try await producer.run()
}

// Task sending messages
group.addTask {
try producer.send(
KafkaProducerMessage(
topic: "topic-name",
value: "Hello, World!"
)
)

// Required
await producer.shutdownGracefully()
Comment on lines +54 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really want to get rid of this by folding it into the run() method. Maybe let's not add this to the example here.

}
}
```

#### Acknowledgements

The `send(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `acknowledgements` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error.

```swift
let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"])

let (producer, acknowledgements) = try await KafkaProducer.makeProducerWithAcknowledgements(
let (producer, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements(
config: config,
logger: .kafkaTest // Your logger here
)
Expand All @@ -42,9 +76,9 @@ await withThrowingTaskGroup(of: Void.self) { group in
try await producer.run()
}

// Task receiving acknowledgements
// Task sending messages and receiving acknowledgements
group.addTask {
let messageID = try await producer.send(
let messageID = try producer.send(
KafkaProducerMessage(
topic: "topic-name",
value: "Hello, World!"
Expand All @@ -55,8 +89,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
// Check if acknowledgement belongs to the sent message
}

// Required
await producer.shutdownGracefully()
// The producer shuts down automatically after consuming the acknowledgements
}
}
```
Expand Down
Loading