Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to detect consumption error while using KafkaConsumerObservable? #182

Open
yeryomenkom opened this issue Apr 9, 2020 · 3 comments
Open

Comments

@yeryomenkom
Copy link

Hi!
I couldn't find any test in the lib source that tests the behaviour of KafkaConsumerObservable in case of kafka shutdown. Can you explain me when KafkaConsumerObservable should fail?
Currently I am not able to make KafkaConsumerObservable fail...

@Avasil
Copy link
Collaborator

Avasil commented Apr 9, 2020

It could fail if poll fails so you could try replicating one of the conditions

@yeryomenkom
Copy link
Author

Yeah. I saw it in the code.
But in general I am expecting that this test should pass: https://github.com/monix/monix-kafka/pull/183/files

@Avasil
Copy link
Collaborator

Avasil commented Apr 18, 2020

It seems like consumer.poll hangs if there is no conenction:

  test("observable should become completed after kafka broker shutdown") {
    EmbeddedKafka.start()
    assert(EmbeddedKafka.isRunning)
    val props = new Properties()
    props.setProperty("bootstrap.servers", "127.0.0.1:6001")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(topicsRegex.pattern)

    EmbeddedKafka.stop()
    assert(!EmbeddedKafka.isRunning)

    consumer.poll(0)
  }

We probably have to listen for shutdown in a separate thread

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants