Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[avro workaround] Pulling avro data but using an SMT to transform avro to json #256

Open
ee07dazn opened this issue Jan 25, 2023 · 0 comments
Labels

Comments

@ee07dazn
Copy link

ee07dazn commented Jan 25, 2023

Hi
I do know I have raised an issue for availability to send avro data to GCS using the GCS sink connector.
I am trying a workaround but i am getting a strange error so wanted to see if you guys can throw some light into the issue.

The way I understand is the converters are separate from connectors and on the consumer side, the data goes through the deserialiser ---> SMT ---> connector (GCS sink connector) and then the external system (GCS)

With the above in mind, I am trying to still use the avro deserialiser in the connector settings, so that the data can be properly deserialised and then have an SMT that converts the records into a json object, which the GCS sink connector can understand. However, at the deserialisation level itself, i get the following error.

Do you if somehow the connector interacts with the converter in some way that could lead to this outcome ?

PS - If I were to use a big query sink converter, it just works with the same avro converter settings.

2023-01-25 16:37:28,737 ERROR [cloudsql-proddb-gcs|task-0] WorkerSinkTask{id=cloudsql-proddb-gcs-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-cloudsql-proddb-gcs-0]
... 20 more
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:192)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:119)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:408)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:415)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:334)
at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:527)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1520)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1613)
at java.base/sun.net.www.protocol.http.HttpURLConnection.writeRequests(HttpURLConnection.java:730)
at java.base/sun.net.www.protocol.http.HttpURLConnection.writeRequests(HttpURLConnection.java:718)
Caused by: java.io.IOException: Error writing to server
... 17 more
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:88)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:140)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:243)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:183)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 139
... 13 more
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:101)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic proddb.public.egg to Avro: 
at java.base/java.lang.Thread.run(Thread.java:829)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
Jan 25, 2023 4:37:21 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listLoggers in org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

Jan 25, 2023 4:37:21 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be ignored. 
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be ignored. 
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will be ignored. 
Jan 25, 2023 4:37:21 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
Jan 25, 2023 4:37:21 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
Jan 25, 2023 4:37:21 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.RootResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
Configuring Java heap: -Xms3629247365 -Xmx3629247365
+ exec /usr/bin/tini -w -e 143 -- /opt/kafka/bin/connect-distributed.sh /tmp/strimzi-connect.properties
consumer.sasl.jaas.config=[hidden]
consumer.sasl.mechanism=SCRAM-SHA-256
@ahmedsobeh ahmedsobeh transferred this issue from Aiven-Open/gcs-connector-for-apache-kafka Aug 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants