Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Support for headers #945

Open
rolisz opened this issue May 13, 2019 · 1 comment
Open

Support for headers #945

rolisz opened this issue May 13, 2019 · 1 comment

Comments

@rolisz
Copy link

rolisz commented May 13, 2019

Is there any way to access the Kafka record headers? I'm talking about this.

@imperio-wxm
Copy link

@rolisz

kafka version: 1.0.0
pykafka version: 2.8.0

When I consume a topic(with header), after the commit, the brokers reports errors:
Looks like the message format is not compatible.

[2019-10-29 15:46:06,031] ERROR [KafkaApi-109] Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=5,fetch_offset=26618513,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v0 does not support record headers
	at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403)
	at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586)
	at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134)
	at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109)
	at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518)
	at scala.Option.map(Option.scala:146)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508)
	at scala.Option.flatMap(Option.scala:171)
	at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
	at kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034)
	at kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52)
	at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033)
	at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588)
	at kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175)
	at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587)
	at kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
	at kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
	at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
	at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:596)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:100)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
	at java.lang.Thread.run(Thread.java:744)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants