This repository has been archived by the owner on Mar 24, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 232
Support for headers #945
Comments
kafka version: 1.0.0 When I consume a topic(with header), after the commit, the brokers reports errors: [2019-10-29 15:46:06,031] ERROR [KafkaApi-109] Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=5,fetch_offset=26618513,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
at kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034)
at kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588)
at kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:596)
at kafka.server.KafkaApis.handle(KafkaApis.scala:100)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
at java.lang.Thread.run(Thread.java:744) |
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Is there any way to access the Kafka record headers? I'm talking about this.
The text was updated successfully, but these errors were encountered: