-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
materialize-kafka: new connector #2196
Conversation
An initial working version of the Kafka materialization, that materializes documents as Avro messages.
Override the default behavior of logging on librdkafka errors as ERROR logs and log them as debug instead. If there's actually a fatal error the connector will encounter that through its normal processing and error out with the typical logging.
Adds an option for messages to be encoded as JSON if a schema registry isn't available, or just if JSON encoding is preferred.
Adds snapshot tests for the `Spec` command and also some simple verification of materialized documents.
Adds the dockerfile and CI build steps for materialize-kafka. I ended up not trying to share code from source-kafka right now since it was only needed in the tests, and have instead copy/pasted the Avro to JSON translating logic directly into the materialize-kafka tests file.
The synthetic flow_published_at field is recommended by default in all other materializations, so we'll recommend it here as well. The concept of a "synthetic" field is somewhat interesting when considering validation constraints, but for current practical purposes the only ones to be concerned with are flow_published_at and flow_document.
It may be helpful to have a little more logged information about checkpoint recovery, particularly to get an idea of how much compaction is done on the checkpoints topic over time.
…ints The higher-level FutureProducer posed some performance challenges, and so the more low-level ThreadedProducer has been swapped in. It seems like the intention with the FutureProducer is to create futures for all of your messages, and then await all of them individually. This is mostly a memory problem, since we'd have to create a future for every message in a transaction, or implement some kind of more complicated system of backpressure. The ThreadedProducer works more directly with the producer queue, and we apply some simple backpressure via a `poll` call to the producer when that queue is full. This seems to work well in my own testing, and has a good amount of throughput. The checkpoint persistence and recovery code has been ripped out, for two reasons: First, the ThreadedProducer is a little more complicated to use (particularly in async handling), and I didn't particularly feel like figuring out how to use it in the same way that the prior implementation with the FutureProducer worked. And second, Kafka has a concept of a "transaction timeout", which is the maximum length of time a transaction can be open. There's always going to be some timeout, although it could be on the order of 10's of minutes. But obviously this is problematic for the idea of materializations needing to handle unbounded transaction sizes, and we were always going to need some kind of "non transactional" mode of operation that features at-least-once message delivery. This initial version of the materialization connector will only have that at-least-once mode, and we can revisit exactly-once later if there is demand for it.
a728c2d
to
d67cde6
Compare
Connecting to AWK MSK with the rust Kafka library we are using requires the client to call "poll" so that an auth token is generated. But we need to use an admin client to create topics during the materialization `Apply`, and there is no way that I could find to call "poll" with the admin client to generate a token. It's possible that I'm missing something here, but I tried pretty hard to make it work. If it it turns out that anybody wants to use this connector with MSK we can revisit this later. For now I think it's best to remove MSK as an option, since it isn't going to work well for most users unless they somehow pre-create all of the topics the connector needs, which is the only other option I can think of.
Take the selected key fields as-is, without unescaping ~0 or ~1 when building the slice of key pointers used to create the Avro schema. This fixes some inconsistencies with key serialization when key fields contain actual slashes in the names.
d67cde6
to
70e0ea4
Compare
70e0ea4
to
750aa09
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM in general, just a few questions
@@ -0,0 +1,41 @@ | |||
[package] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose at some point we will have a workspace in this repository and the connectors will share their dependencies, like the crates do in flow
repository and the go connectors do here, does that sound right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely! I had experimented a little with doing that in this PR but decided to hold off for the time being. There's some re-org/cleanup we could do with the common packages for the Go connectors as well. I'm thinking it'll be a bit of general work for adapting the repo to more of a multi-language format than we have previously assumed.
// The selected key fields are treated as unescaped strings. For | ||
// example, a key field like `some/nested~1key` is taken directly as | ||
// a property named "some/nested~1key" with that pointer rather than | ||
// unescaping the ~1 into a nested pointer of ("some", "nested/key"). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I fully understand the implication of this, what kind of scenario does this cover?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have a document with a structure like this:
{
"some": {
"nested/key": "value"
}
}
You can select the "nested/key"
field, which is nested in the containing "some"
object, and Flow provides a flattened alias for that field that in the string "some/nested~1key"
- the "nesting" is represented by slashes, and actual slashes in the field names are escaped as ~1
.
SQL materializations etc. take that alias directly and create a column for it. We want to do the same thing with Kafka, creating a top-level property if a user explicitly selects a nested field (nested fields are not recommended by default, since the full structure of all top-level fields is materialized by default, so there's not really any reason to select nested fields, but you could if you want to).
The use of Token::from_str
is in contrast to using Pointer::from_str
more directly since Pointer::from_str
would interpret such a string as representing a nested structure rather than just treating it as the desired named of the field.
It's all pretty obscure and not likely to come up very often, but it was an edge case I uncovered in my testing.
Input, Output, | ||
}; | ||
|
||
pub async fn run_transactions(mut input: Input, mut output: Output, open: Open) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine we want to extract this into a library at some point for writing Rust connectors? this seems like materialize-boilerplate
, but for Rust
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep for sure. Probably with the next Rust materialization we write, whenever that is, especially if it needs to do standard updates.
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => { | ||
tracing::debug!("polling producer since queue is full"); | ||
producer.poll(Timeout::Never); | ||
msg = original_message; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this line necessary? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
producer.send
takes ownership of its msg
argument, so without this line the borrow checker complains that on subsequent iterations of the retry loop msg
has been moved. The producer.send
error response gives you back the message it took ownership of and this reassignment of that to the original msg
lets the next round of the loop use that one (which is the same one, really) rather than the moved variable.
Description:
New connector for materializing data to Kafka topics. It can encode messages as JSON, or Avro when there is a schema registry configured.
There were initial attempts made at an exactly-once mode of operation, where Kafka producer transactions were used and checkpoints were written to a checkpoints topic that could then be recovered. This was taken out as a simplification, and because of complications that would likely arise from the existence of Kafka transaction timeouts, which limit how long transactions can be open. An at-least-once non-transactional mode of operation will be needed no matter what because of this timeout consideration, and so the connector is being initially released with only that capability and we can add transaction support later if there is an actual need for it.
Field selection is a little different for this connector than for example SQL materializations: Here, all top-level fields are recommended by default, so that the default path is for users to end up with their entire document materialized nicely without having to do anything different. It is also possible to de-select top-level fields, or include additional nested fields if that is needed for some reason.
Closes #2138
Workflow steps:
(How does one use this feature, and how has it changed)
Documentation links affected:
I will write new documentation for the connector along with merging this PR.
Notes for reviewers:
(anything that might help someone review this PR)
This change is