Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor KafkaProducer #67

Conversation

felixschlegel
Copy link
Contributor

@felixschlegel felixschlegel commented Jun 22, 2023

Fixes: #64

Motiviation:

  • align KafkaProducer more with proposed changes to KafkaConsumer
  • AsyncStream was not handling AsyncSequence termination handling as
    we wanted it to, so revert back to use NIOAsyncSequenceProducer

Modifications:

  • make KafkaProducer final class instead of actor
  • KafkaProducer: use NIOAsyncSequenceProducer instead of
    AsyncSequence for better termination handling -> shutdown
    KafkaProducer on termination of the AsyncSequence
  • introduce StateMachine to KafkaProducer
  • move the internal state of KafkaProducer to KafkaProducer.StateMachine
  • remove unused await expressions when accessing KafkaProducer
  • update tests
  • update README

@felixschlegel felixschlegel mentioned this pull request Jun 27, 2023
11 tasks
@felixschlegel felixschlegel force-pushed the fs-kafka-producer-remove-async-stream branch from 75e5b42 to 83d7077 Compare June 28, 2023 09:12
let action = self.stateMachine.withLockedValue { $0.finish() }
switch action {
case .shutdownGracefullyAndFinishSource(let client, let source, let topicHandles):
Task {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't do this. Can we instead do the same thing for flush as you did with commitSync and close i.e. using the queue and a callback. Then we can use a continuation somewhere. We somehow need to get this termination event over to the KafkaProducers run method probably.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I haven't found any async alternative to rd_kafka_flush like there is with rd_kafka_consumer_close_queue or rd_kafka_commit_queue.

What I have done however is get rid of rd_kafka_flush and imitate this method as good as possible using our StateMachine and our run loop. However, what I have done is very basic compared to the actual rd_kafka_flush implementation. I am basically just polling until there are no more events in librdkafka's outgoing queue when KafkaProducer.shutdownGracefully has been invoked. Atm there are no timeouts whatsoever. Please let me know what you think.

client: KafkaClient,
messageIDCounter: UInt,
source: Producer.Source?,
topicHandles: [String: OpaquePointer]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we abstract the handles into a class as well so we avoid the OpaquePointer here?

/// Poll client for new consumer messages.
case poll(client: KafkaClient)
/// Kill the poll loop.
case killPollLoop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a different word than kill here like terminate?

@felixschlegel felixschlegel force-pushed the fs-kafka-producer-remove-async-stream branch 3 times, most recently from ce6a38e to 29415e1 Compare June 29, 2023 15:53
@felixschlegel felixschlegel requested a review from FranzBusch June 30, 2023 09:32
Comment on lines 201 to 202
public func shutdownGracefully() {
self.stateMachine.withLockedValue { $0.finish() }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call this triggerGracefulShutdown since we are actually not waiting for the shutdown here. Once we adopt lifecycle we should make this method non public

topicHandles: topicHandles
)
case .flushing, .finished:
throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a new separate error here when we flushing which indicates that we are currently shutting down and flushing please

@felixschlegel felixschlegel requested a review from FranzBusch June 30, 2023 11:01
Motiviation:

* align `KafkaProducer` more with proposed changes to `KafkaConsumer`
* `AsyncStream` was not handling `AsyncSequence` termination handling as
  we wanted it to, so revert back to use `NIOAsyncSequenceProducer`

Modifications:

* make `KafkaProducer` `final class` instead of `actor`
* `KafkaProducer`: use `NIOAsyncSequenceProducer` instead of
  `AsyncSequence` for better termination handling -> shutdown
  `KafkaProducer` on termination of the `AsyncSequence`
* introduce `StateMachine` to `KafkaProducer`
* move internal state of `KafkaProducer` to `KafkaProducer.StateMachine`
* remove unused `await` expressions when accessing `KafkaProducer`
* update tests
* update `README`
Modifications:

* move `NoBackPressure` struct to `extension` of
  `NIOAsyncSequenceProducerBackPressureStrategies`
* break down duplicate `ShutDownOnTerminate` type into two more
  specialised types for `KafkaConsumer` and `KafkaProducer`
* add missing `config` parameter to `KafkaProducer`'s initialiser
Modifications:

* create new class `RDKafkaTopicHandles` that wraps a dictionary
  containing all topic names with their respective `rd_kafka_topic_t` handles
* create method `KafkaClient.produce` wrapping the `rd_kafka_produce`
  method in a Swift way
Modifications:

* `KafkaClient`: add new property `outgoingQueueSize`
* `KafkaProducer.StateMachine`: add new state `.flushing`
* `KafkaProducer.shutdownGracefully()`:
    * make non-async
    * remove invocation to `rd_kafka_flush`
    * set state to `KafkaProducer.StateMachine.State` to `.flushing`
* `KafkaProducer` poll loop:
    * poll as long as `outgoingQueueSize` is > 0 to send out any
      enqueued `KafkaProducerMessage`s and serve any enqueued callbacks
* `KafkaProducerTests`: add test asserting that the `librdkafka` `outq`
  is still being served after `KafkaProducer.shutdownGracefully` has
  been invoked as long as there are enqueued items
Modifications:

* rename `KafkaProducer.shutdownGracefully` to
  `KafkaProducer.triggerGracefulShutdown`
* `KafkaProducer.send` separate error message when in state `.flushing`
@felixschlegel felixschlegel force-pushed the fs-kafka-producer-remove-async-stream branch from b0e7d9e to c9b4d03 Compare June 30, 2023 12:43
@FranzBusch FranzBusch merged commit 5ed295f into swift-server:main Jul 3, 2023
@FranzBusch FranzBusch deleted the fs-kafka-producer-remove-async-stream branch July 3, 2023 09:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants