Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BE: Expose consumer's config props likerequest.timeout.ms #527

Open
2 tasks done
cpapad opened this issue Aug 23, 2024 · 11 comments
Open
2 tasks done

BE: Expose consumer's config props likerequest.timeout.ms #527

cpapad opened this issue Aug 23, 2024 · 11 comments
Assignees
Labels
good first issue Up for grabs hacktoberfest Issues good for hacktoberfest goal scope/backend Related to backend changes status/triage/completed Automatic triage completed type/enhancement En enhancement/improvement to an already existing feature

Comments

@cpapad
Copy link

cpapad commented Aug 23, 2024

Issue submitter TODO list

  • I've searched for an already existing issues here
  • I'm running a supported version of the application which is listed here and the feature is not present there

Is your proposal related to a problem?

When we query with CEL filters or free text large topics we come up with the following error

[Consumer clientId=kafbat-ui-consumer-1724336921133, groupId=null] Failed to close fetcher with a timeout(ms)=30000

To overcome this we need to override the consumer request time out and other values

kafka-ui-6b464dd6c5-fw5cw kafka-ui 2024-08-22 13:29:01,386 INFO  [boundedElastic-1] o.a.k.c.c.ConsumerConfig: ConsumerConfig values:
kafka-ui-6b464dd6c5-fw5cw kafka-ui allow.auto.create.topics = false
kafka-ui-6b464dd6c5-fw5cw kafka-ui auto.commit.interval.ms = 5000
kafka-ui-6b464dd6c5-fw5cw kafka-ui auto.include.jmx.reporter = true
kafka-ui-6b464dd6c5-fw5cw kafka-ui auto.offset.reset = earliest
kafka-ui-6b464dd6c5-fw5cw kafka-ui bootstrap.servers = [kafka-stg-shared-workable-staging.aivencloud.com:24451]
kafka-ui-6b464dd6c5-fw5cw kafka-ui check.crcs = true
kafka-ui-6b464dd6c5-fw5cw kafka-ui client.dns.lookup = use_all_dns_ips
kafka-ui-6b464dd6c5-fw5cw kafka-ui client.id = kafbat-ui-consumer-1724333341385
kafka-ui-6b464dd6c5-fw5cw kafka-ui client.rack =
kafka-ui-6b464dd6c5-fw5cw kafka-ui connections.max.idle.ms = 540000
kafka-ui-6b464dd6c5-fw5cw kafka-ui default.api.timeout.ms = 60000
kafka-ui-6b464dd6c5-fw5cw kafka-ui enable.auto.commit = false
kafka-ui-6b464dd6c5-fw5cw kafka-ui exclude.internal.topics = true
kafka-ui-6b464dd6c5-fw5cw kafka-ui fetch.max.bytes = 52428800
kafka-ui-6b464dd6c5-fw5cw kafka-ui fetch.max.wait.ms = 500
kafka-ui-6b464dd6c5-fw5cw kafka-ui fetch.min.bytes = 1
kafka-ui-6b464dd6c5-fw5cw kafka-ui group.id = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui group.instance.id = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui heartbeat.interval.ms = 3000
kafka-ui-6b464dd6c5-fw5cw kafka-ui interceptor.classes = []
kafka-ui-6b464dd6c5-fw5cw kafka-ui internal.leave.group.on.close = true
kafka-ui-6b464dd6c5-fw5cw kafka-ui internal.throw.on.fetch.stable.offset.unsupported = false
kafka-ui-6b464dd6c5-fw5cw kafka-ui isolation.level = read_uncommitted
kafka-ui-6b464dd6c5-fw5cw kafka-ui key.deserializer = class org.apache.kafka.common.serialization.BytesDeserializer
kafka-ui-6b464dd6c5-fw5cw kafka-ui max.partition.fetch.bytes = 1048576
kafka-ui-6b464dd6c5-fw5cw kafka-ui max.poll.interval.ms = 300000
kafka-ui-6b464dd6c5-fw5cw kafka-ui max.poll.records = 500
kafka-ui-6b464dd6c5-fw5cw kafka-ui metadata.max.age.ms = 300000
kafka-ui-6b464dd6c5-fw5cw kafka-ui metric.reporters = []
kafka-ui-6b464dd6c5-fw5cw kafka-ui metrics.num.samples = 2
kafka-ui-6b464dd6c5-fw5cw kafka-ui metrics.recording.level = INFO
kafka-ui-6b464dd6c5-fw5cw kafka-ui metrics.sample.window.ms = 30000
kafka-ui-6b464dd6c5-fw5cw kafka-ui partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
kafka-ui-6b464dd6c5-fw5cw kafka-ui receive.buffer.bytes = 65536
kafka-ui-6b464dd6c5-fw5cw kafka-ui reconnect.backoff.max.ms = 1000
kafka-ui-6b464dd6c5-fw5cw kafka-ui reconnect.backoff.ms = 50
kafka-ui-6b464dd6c5-fw5cw kafka-ui request.timeout.ms = 30000
kafka-ui-6b464dd6c5-fw5cw kafka-ui retry.backoff.ms = 100
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.client.callback.handler.class = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.jaas.config = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.kerberos.kinit.cmd = /usr/bin/kinit
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.kerberos.min.time.before.relogin = 60000
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.kerberos.service.name = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.kerberos.ticket.renew.jitter = 0.05
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.kerberos.ticket.renew.window.factor = 0.8
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.login.callback.handler.class = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.login.class = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.login.connect.timeout.ms = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.login.read.timeout.ms = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.login.refresh.buffer.seconds = 300
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.login.refresh.min.period.seconds = 60
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.login.refresh.window.factor = 0.8
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.login.refresh.window.jitter = 0.05
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.login.retry.backoff.max.ms = 10000
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.login.retry.backoff.ms = 100
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.mechanism = GSSAPI
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.oauthbearer.clock.skew.seconds = 30
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.oauthbearer.expected.audience = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.oauthbearer.expected.issuer = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.oauthbearer.jwks.endpoint.url = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.oauthbearer.scope.claim.name = scope
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.oauthbearer.sub.claim.name = sub
kafka-ui-6b464dd6c5-fw5cw kafka-ui sasl.oauthbearer.token.endpoint.url = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui security.protocol = SSL
kafka-ui-6b464dd6c5-fw5cw kafka-ui security.providers = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui send.buffer.bytes = 131072
kafka-ui-6b464dd6c5-fw5cw kafka-ui session.timeout.ms = 45000
kafka-ui-6b464dd6c5-fw5cw kafka-ui socket.connection.setup.timeout.max.ms = 30000
kafka-ui-6b464dd6c5-fw5cw kafka-ui socket.connection.setup.timeout.ms = 10000
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.cipher.suites = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.endpoint.identification.algorithm = https
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.engine.factory.class = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.key.password = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.keymanager.algorithm = SunX509
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.keystore.certificate.chain = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.keystore.key = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.keystore.location = /etc/kafka/stores/client.keystore.p12
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.keystore.password = [hidden]
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.keystore.type = PKCS12
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.protocol = TLSv1.3
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.provider = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.secure.random.implementation = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.trustmanager.algorithm = PKIX
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.truststore.certificates = null
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.truststore.location = /etc/kafka/stores/client.truststore.jks
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.truststore.password = [hidden]
kafka-ui-6b464dd6c5-fw5cw kafka-ui ssl.truststore.type = JKS
kafka-ui-6b464dd6c5-fw5cw kafka-ui value.deserializer = class org.apache.kafka.common.serialization.BytesDeserializer

While admin configuration options are exposed as far as we are concerned we can not override with configuration or environment variables timeout settings e.t.c. of the consumer.

Describe the feature you're interested in

Values to override request timeout and relevant settings of the consumer used for querying topics to avoid errors and timeouts in large topics.

Describe alternatives you've considered

No response

Version you're running

2956664 v1.0.0

Additional context

No response

@cpapad cpapad added status/triage Issues pending maintainers triage type/feature A brand new feature labels Aug 23, 2024
@kapybro kapybro bot added status/triage/manual Manual triage in progress area/topics status/triage/completed Automatic triage completed and removed status/triage Issues pending maintainers triage labels Aug 23, 2024
Copy link

Hi cpapad! 👋

Welcome, and thank you for opening your first issue in the repo!

Please wait for triaging by our maintainers.

As development is carried out in our spare time, you can support us by sponsoring our activities or even funding the development of specific issues.
Sponsorship link

If you plan to raise a PR for this issue, please take a look at our contributing guide.

@Haarolean Haarolean added type/enhancement En enhancement/improvement to an already existing feature scope/backend Related to backend changes and removed type/feature A brand new feature area/topics status/triage/manual Manual triage in progress labels Aug 29, 2024
@Haarolean Haarolean changed the title Expose request.timeout.ms of the consumer used for querying to avoid timeouts on large topics BE: Expose request.timeout.ms of the consumer used for querying to avoid timeouts on large topics Aug 29, 2024
@Haarolean Haarolean changed the title BE: Expose request.timeout.ms of the consumer used for querying to avoid timeouts on large topics BE: Expose consumer's config props likerequest.timeout.ms Aug 29, 2024
@Haarolean Haarolean moved this to Todo in Up for grabs Oct 6, 2024
@Haarolean Haarolean added the good first issue Up for grabs label Oct 6, 2024
@Jaideep-C
Copy link

Facing a similar problem need the ability to configure isolation.level

@Haarolean Haarolean added the hacktoberfest Issues good for hacktoberfest goal label Oct 18, 2024
@helpmessage
Copy link

I'm facing exactly the same problem as @Jaideep-C .
I'm testing Kafka Streams with EOS and need to change isolation.level to READ_COMMITTED to only see messages that "matters".

@Alex1OPS
Copy link

@Haarolean, I faced the same issue, happy to contribute if it's still up for grabs.

@Haarolean Haarolean moved this from Todo to In Development in Up for grabs Dec 31, 2024
@Jaideep-C
Copy link

Hi @Alex1OPS How are you approaching to expose the configs?

@wernerdv
Copy link
Contributor

wernerdv commented Jan 10, 2025

@Jaideep-C According to ConsumerGroupService you can specify the necessary consumer settings via the cluster properties, for example:

kafka:
  clusters:
    -
      name: local
      properties:
          isolation.level: read_committed

Can you test this?

@Alex1OPS
Copy link

@wernerdv This would work, but cluster properties are also applied to the admin client, which can be undesirable. For example, the isolation.level property will be ignored with a corresponding log message.

Another pain point is that in the current implementation whatever is specified in cluster property request.timeout.ms takes priority over adminClientTimeout. It can lead to confusion in the following config:

kafka:
  adminClientTimeout: 75000
  clusters:
    - name: local
      bootstrapServers: localhost:9092
      properties:
        isolation.level: read_committed
        request.timeout.ms: 600000

because admin client timeout will be set to 600000 instead of the expected 75000.

I see three options:

  1. Do nothing: cluster properties indeed cover both cases mentioned in the issue. Both isolation.level and request.timeout.ms can be specified. Enriching the configuration section of the docs might be a good idea to include overriding behaviour and cluster properties for future reference.
  2. Fix properties overriding for admin config: If keeping adminClientTimeout is desirable, the current implementation should be slightly modified.
  3. Clearly separate admin and consumer client properties. E.g. introduce consumer configuration to the Cluster. This is slightly more work but provides more clarity and flexibility. Admin and consumer clients are quite different. For example, a higher request timeout makes total sense for the consumer but can be undesirable for the admin client because it disables failing fast in case of misconfiguration.

@wernerdv
Copy link
Contributor

@Alex1OPS

E.g. introduce consumer configuration to the Cluster.

I like the third option.

@helpmessage
Copy link

I've took a look at the code and think that the isolation.level=read_commited won´t solve the message count presented, since it appears that it is calculated by the sum of last offsets of each topic / partition.

Above all, this is done by the admin client and not a consumer and, as far as I know, admin client does not support / use the property.

@Haarolean
Copy link
Member

I've took a look at the code and think that the isolation.level=read_commited won´t solve the message count presented, since it appears that it is calculated by the sum of last offsets of each topic / partition.

Above all, this is done by the admin client and not a consumer and, as far as I know, admin client does not support / use the property.

No matter if exposing consumer configs gonna solve this issue, we still plan on doing so (there's another case #793 calling for this).

@Haarolean
Copy link
Member

@wernerdv This would work, but cluster properties are also applied to the admin client, which can be undesirable. For example, the isolation.level property will be ignored with a corresponding log message.

Another pain point is that in the current implementation whatever is specified in cluster property request.timeout.ms takes priority over adminClientTimeout. It can lead to confusion in the following config:

kafka:
  adminClientTimeout: 75000
  clusters:
    - name: local
      bootstrapServers: localhost:9092
      properties:
        isolation.level: read_committed
        request.timeout.ms: 600000

because admin client timeout will be set to 600000 instead of the expected 75000.

I see three options:

  1. Do nothing: cluster properties indeed cover both cases mentioned in the issue. Both isolation.level and request.timeout.ms can be specified. Enriching the configuration section of the docs might be a good idea to include overriding behaviour and cluster properties for future reference.
  2. Fix properties overriding for admin config: If keeping adminClientTimeout is desirable, the current implementation should be slightly modified.
  3. Clearly separate admin and consumer client properties. E.g. introduce consumer configuration to the Cluster. This is slightly more work but provides more clarity and flexibility. Admin and consumer clients are quite different. For example, a higher request timeout makes total sense for the consumer but can be undesirable for the admin client because it disables failing fast in case of misconfiguration.

Let's proceed with the third option. Could you also allow overriding both producer and consumer properties in your PR to address #793 as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Up for grabs hacktoberfest Issues good for hacktoberfest goal scope/backend Related to backend changes status/triage/completed Automatic triage completed type/enhancement En enhancement/improvement to an already existing feature
Projects
Status: In Development
Development

No branches or pull requests

6 participants