-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support deserializing json in kafka source #344
Comments
@qingfei1994 The use case you mentioned, where strings need to be parsed to identify vertices or edges, is quite common, and having structured data upstream could greatly simplify the processing workflow. Not only would this result in a more robust data ingestion process, but it would also coincide with the trend of JSON being widely adopted as the data interchange format in various systems and services. If you're interested in contributing this feature, I believe it would be a very welcome improvement. Should you have any design ideas, feel free to outline the design and implementation details, and engage with the community to discuss and refine the concept. Once again, thank you for your initiative and for contributing to the TuGraph project. If you need any further information or have additional questions, please do not hesitate to reach out. Looking forward to your proposal! |
Thanks! @Leomrlin 1."geaflow.dsl.kafka.format". It can be configured as text or json. If it's configured as "json", kafka message will be deserialized as json and return a collection of Row according to the table schema within the fetch function of KafkaSourceTable.
Error may occur when deserializing json, so need to add more options like 'geaflow.dsl.kafka.format.json.fail-on-missing-field'(true/false), 'geaflow.dsl.kafka.format.json.ignore-parse-error'.
|
@qingfei1994 Your proposal for enhancing the JSON deserialization capabilities within our Kafka source is superb! I see it as not just a solution to the immediate needs but also as laying the groundwork for a more robust deserialization framework. Our existing system's TableDeserializer interface is perfectly suited to integrate different parsers, similar to the current TextDeserializer. By incorporating the JSON deserializer at this level, we ensure that it can be utilized across different connectors within the TuGraph project, not just limited to Kafka. Regarding the configuration options, I concur that the deserializer may have numerous parameters in the future that could further define its functionalities. However, I suggest that adjusting the configuration to a more general level, such as geaflow.dsl.connector.format.json, could be a strategic approach. This would enable future JSON parsers to maintain configuration consistency across different connectors and simplify the management of settings related to JSON deserialization. Perhaps in the future, we can provide a uniform and flexible JSON deserialization error handling strategy for the entire TuGraph system. Once again, thank you for your proactive attitude and for contributing such thoughtful ideas to the TuGraph project. We are very much looking forward to your detailed design and subsequent implementation. Should you need any assistance or have further questions as you develop this feature, please do not hesitate to reach out to the community. |
Thanks @Leomrlin!
What do you think? |
Good! I fully agree with your view on abstracting the configuration to a more general level, such as geaflow.dsl.connector.format.json being a possible strategic move. This would ensure that future JSON parsers can maintain consistent configuration across different connectors and make it easier to manage settings related to JSON deserialization. Such a design would allow us to integrate these configuration options into each of the connectors within TuGraph, not just Kafka or Pulsar. I support your continued efforts to advance this proposal. If you need more feedback or encounter challenges in implementing these features, please do not hesitate to share with us immediately. The TuGraph community is always eager to help and move forward together. Looking forward to seeing your further progress! |
Currently Kafka Source only support StringDeserializer, we have to use some special character to identify a vertex or edge, something like this.
INSERT INTO dy_modern.person(id, name) SELECT cast(trim(split_ex(t1, ',', 0)) as bigint), split_ex(trim(t1), ',', 1) FROM ( Select trim(substr(text, 2)) as t1 FROM tbl_source WHERE substr(text, 1, 1) = '.' );
But we wanna use a more structured format in a kafka topic.
Could we support deserializing json in kafka source?
I'm willing to work on this if you guys think it's neccessary.
The text was updated successfully, but these errors were encountered: