Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check that at least one message has been consumed before breaking to … #155

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

kjvalencik
Copy link

Check that at least one message has been consumed before breaking to prevent hanging on messages that are larger than max_bytes.

Steps to reproduce:

curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \
      --data '{"records":[{"value":"S2Fma2E="}]}' "http://localhost:8082/topics/test"

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \
      --data '{"id": "my_instance", "format": "binary", "auto.offset.reset": "smallest"}' \
      http://localhost:8082/consumers/my_binary_consumer

curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \
      http://localhost:8082/consumers/my_binary_consumer/instances/my_instance/topics/test?max_bytes=1

Expected: Response contains one message.
Actual: Response contains no messages.

…prevent hanging on messages that are larger than max_bytes
@ConfluentJenkins
Copy link
Contributor

Can one of the admins verify this patch?

@ghost
Copy link

ghost commented Feb 8, 2016

@confluentinc It looks like @kjvalencik just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@kjvalencik
Copy link
Author

@ewencp Is this something that is acceptable to merge?

@ewencp
Copy link
Contributor

ewencp commented Mar 9, 2016

@kjvalencik This is a parameter that the normal Java clients actually enforce and refuse to return data if it exceeds the configured size. In fact, brokers are also supposed to filter messages that are too large, such that you need to coordinate settings across producer, broker, and consumer to get larger messages to make it through the full pipleine.

This can potentially be pretty significant. If I configure the broker (or one topic on the broker) to accept larger messages, I can do things like produce a 1GB message. Then consumers that might not be expecting such large messages can easily run into memory issues. Of course the 1GB example is arbitrary, if you remove the size limit in order to return at least one message, then the client has to accept arbitrarily large messages.

I'm not necessarily opposed to merging a change like this, but max_bytes definitely expresses an intent that no more than that amount of data (even if REST proxy can only enforce it roughly) should be returned. There's another issue filed that suggests providing a min_bytes parameter to allow for more push-like behavior. Would setting max_bytes to a large value and combining it with a min_bytes parameter to get more prompt responses to consume requests be a better solution? In that case you would explicitly be allowing for much larger messages (or still be able to cap their size!), but be explicitly requesting a faster response if only a smaller amount of data is available.

@kjvalencik
Copy link
Author

The issue I am having is that I have a couple of topics with wildly
different message sizes that each take roughly the same amount of time.

With the normal Java consumer, this is okay because health checks are not
tightly coupled to pulling data. It is possible to set a value that is
larger than your largest message. Then if this results in a very large
number of messages, you can continue to heartbeat while you drain the
queue.

But, this isn't possible in kafka rest because pulling messages is the only
way to heartbeat. If you are unable to process your local queue in time,
the consumer will time out, a rebalance will occur and you won't be able to
commit offsets. This starts the cycle over.

There are a few possible solutions:

  • Allow returning a single message that exceeds the REST max bytes, but not
    exceeding the internal consumer max bytes (what this PR does)
  • Add an endpoint that allows heartbeats without pulling messages
  • Allow committing arbitrary offsets. This helps the least because you
    still have unnecessary rebalances.

One last option would be to implement a max messages limit. Because of the way the query parameters provided max bytes is already using REST to buffer, this doesn't really differ much. However, it is a big departure from the Java client philosophy, so I wouldn't recommend it.

Thanks!
On Mar 9, 2016 3:20 AM, "Ewen Cheslack-Postava" [email protected]
wrote:

@kjvalencik https://github.com/kjvalencik This is a parameter that the
normal Java clients actually enforce and refuse to return data if it
exceeds the configured size. In fact, brokers are also supposed to filter
messages that are too large, such that you need to coordinate settings
across producer, broker, and consumer to get larger messages to make it
through the full pipleine.

This can potentially be pretty significant. If I configure the broker (or
one topic on the broker) to accept larger messages, I can do things like
produce a 1GB message. Then consumers that might not be expecting such
large messages can easily run into memory issues. Of course the 1GB example
is arbitrary, if you remove the size limit in order to return at least one
message, then the client has to accept arbitrarily large messages.

I'm not necessarily opposed to merging a change like this, but max_bytes
definitely expresses an intent that no more than that amount of data (even
if REST proxy can only enforce it roughly) should be returned. There's
another issue filed that suggests providing a min_bytes parameter to
allow for more push-like behavior. Would setting max_bytes to a large
value and combining it with a min_bytes parameter to get more prompt
responses to consume requests be a better solution? In that case you would
explicitly be allowing for much larger messages (or still be able to cap
their size!), but be explicitly requesting a faster response if only a
smaller amount of data is available.


Reply to this email directly or view it on GitHub
#155 (comment)
.

@gawth
Copy link

gawth commented Mar 9, 2016

Having looked into this area as part of my pull request I agree with @ewencp that it would be a bit of a significant change.

With regards heartbeat, we're using the max bytes to our advantage. We have a heartbeat thread that makes a request to kafka-rest for messages with max bytes set to 1byte. This will never return a message (non of our messages are that small!) but keeps the consumer active. Would that work for you?

Alan

@kjvalencik
Copy link
Author

I had considered that work around. The issue with using that approach is it is a very expensive way to do a heartbeat (possibly due to one of these: #154, #162). Although, I haven't had a chance to investigate.

We are already having significant cpu usage with the current load (consuming two CPU cores all of the time, with 15 seconds of lagtime between batches) and I am concerned that the additional load would be unreasonable.

In regards to @gawth, I think my preferred change would be a dedicated, lightweight, heartbeat endpoint. This could also potentially decrease load because we would be able to have fewer calls to fetch.

In the meantime, I'm content continuing to use a forked branch with this change.

@gawth
Copy link

gawth commented Mar 9, 2016

We've got a low number of topics at the moment so not been an issue for us for the number of health checks we need to do.

How many topics/partitions are you dealing with?
How frequently would you be carrying out the health check?
What's your current request timeout on kafka-rest?
What's your consumer timeout configured to?

Ta,
Alan

@kjvalencik
Copy link
Author

  • How many topics/partitions are you dealing with? ~50 topics, 30 partitions each
  • How frequently would you be carrying out the health check? We are using the default consumer timeout (5 minutes), so I don't see a reason to do it more than once a minute.
  • What's your current request timeout on kafka-rest? Default 1s.
  • What's your consumer timeout configured to? Default 5 minutes.

Also, running 9 instances of kafka-rest. We aren't scaled wide at the moment, so most topics only have a couple of consumers.

@kjvalencik
Copy link
Author

@gawth How would you feel about this change if it was updated to make maxBytes=0 a special case?

Logic:

  • If msg bytes + read bytes > max bytes
    • if max bytes = 0 then return
    • if read bytes > 0 then return
    • else continue reading
  • else continue reading

@alecswan
Copy link

We just spent two days troubleshooting a slightly different problem caused by the same code. Yesterday we observed that the consumer lag on some partitions kept constantly growing. The REST client was requesting 32MB max_bytes, the REST proxy consumer was configured to read up to 64MB and the message that caused the problem was 40MB in size.

The code change in this PR request would have fixed our issue. I also understand the concern that this code change may cause some existing clients to experience higher memory usage. However, as it stands right now the client has no way to determine if the topic has no more messages or it's just stuck on a larger message than max_bytes. At the very minimum an exception should be thrown and converted to 429 or some 5xx response code if (bytesConsumed == 0 && roughMsgSize > maxResponseBytes).

Copy link

cla-assistant bot commented Sep 12, 2024

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants