-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor KafkaProducer
#67
Refactor KafkaProducer
#67
Conversation
75e5b42
to
83d7077
Compare
let action = self.stateMachine.withLockedValue { $0.finish() } | ||
switch action { | ||
case .shutdownGracefullyAndFinishSource(let client, let source, let topicHandles): | ||
Task { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't do this. Can we instead do the same thing for flush as you did with commitSync and close i.e. using the queue and a callback. Then we can use a continuation somewhere. We somehow need to get this termination event over to the KafkaProducer
s run method probably.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, I haven't found any async alternative to rd_kafka_flush
like there is with rd_kafka_consumer_close_queue
or rd_kafka_commit_queue
.
What I have done however is get rid of rd_kafka_flush
and imitate this method as good as possible using our StateMachine
and our run loop. However, what I have done is very basic compared to the actual rd_kafka_flush
implementation. I am basically just polling until there are no more events in librdkafka
's outgoing queue when KafkaProducer.shutdownGracefully
has been invoked. Atm there are no timeouts
whatsoever. Please let me know what you think.
client: KafkaClient, | ||
messageIDCounter: UInt, | ||
source: Producer.Source?, | ||
topicHandles: [String: OpaquePointer] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we abstract the handles into a class as well so we avoid the OpaquePointer
here?
/// Poll client for new consumer messages. | ||
case poll(client: KafkaClient) | ||
/// Kill the poll loop. | ||
case killPollLoop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a different word than kill here like terminate?
ce6a38e
to
29415e1
Compare
public func shutdownGracefully() { | ||
self.stateMachine.withLockedValue { $0.finish() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call this triggerGracefulShutdown
since we are actually not waiting for the shutdown here. Once we adopt lifecycle we should make this method non public
topicHandles: topicHandles | ||
) | ||
case .flushing, .finished: | ||
throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create a new separate error here when we flushing which indicates that we are currently shutting down and flushing please
Motiviation: * align `KafkaProducer` more with proposed changes to `KafkaConsumer` * `AsyncStream` was not handling `AsyncSequence` termination handling as we wanted it to, so revert back to use `NIOAsyncSequenceProducer` Modifications: * make `KafkaProducer` `final class` instead of `actor` * `KafkaProducer`: use `NIOAsyncSequenceProducer` instead of `AsyncSequence` for better termination handling -> shutdown `KafkaProducer` on termination of the `AsyncSequence` * introduce `StateMachine` to `KafkaProducer` * move internal state of `KafkaProducer` to `KafkaProducer.StateMachine` * remove unused `await` expressions when accessing `KafkaProducer` * update tests * update `README`
Modifications: * move `NoBackPressure` struct to `extension` of `NIOAsyncSequenceProducerBackPressureStrategies` * break down duplicate `ShutDownOnTerminate` type into two more specialised types for `KafkaConsumer` and `KafkaProducer` * add missing `config` parameter to `KafkaProducer`'s initialiser
Modifications: * create new class `RDKafkaTopicHandles` that wraps a dictionary containing all topic names with their respective `rd_kafka_topic_t` handles * create method `KafkaClient.produce` wrapping the `rd_kafka_produce` method in a Swift way
Modifications: * `KafkaClient`: add new property `outgoingQueueSize` * `KafkaProducer.StateMachine`: add new state `.flushing` * `KafkaProducer.shutdownGracefully()`: * make non-async * remove invocation to `rd_kafka_flush` * set state to `KafkaProducer.StateMachine.State` to `.flushing` * `KafkaProducer` poll loop: * poll as long as `outgoingQueueSize` is > 0 to send out any enqueued `KafkaProducerMessage`s and serve any enqueued callbacks * `KafkaProducerTests`: add test asserting that the `librdkafka` `outq` is still being served after `KafkaProducer.shutdownGracefully` has been invoked as long as there are enqueued items
Modifications: * rename `KafkaProducer.shutdownGracefully` to `KafkaProducer.triggerGracefulShutdown` * `KafkaProducer.send` separate error message when in state `.flushing`
b0e7d9e
to
c9b4d03
Compare
Motiviation:
KafkaProducer
more with proposed changes toKafkaConsumer
AsyncStream
was not handlingAsyncSequence
termination handling aswe wanted it to, so revert back to use
NIOAsyncSequenceProducer
Modifications:
KafkaProducer
final class
instead ofactor
KafkaProducer
: useNIOAsyncSequenceProducer
instead ofAsyncSequence
for better termination handling -> shutdownKafkaProducer
on termination of theAsyncSequence
StateMachine
toKafkaProducer
KafkaProducer
toKafkaProducer.StateMachine
await
expressions when accessingKafkaProducer
README