Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

materialize-kafka: new connector #2196

Merged
merged 12 commits into from
Dec 17, 2024
Merged

materialize-kafka: new connector #2196

merged 12 commits into from
Dec 17, 2024

Conversation

williamhbaker
Copy link
Member

@williamhbaker williamhbaker commented Dec 11, 2024

Description:

New connector for materializing data to Kafka topics. It can encode messages as JSON, or Avro when there is a schema registry configured.

There were initial attempts made at an exactly-once mode of operation, where Kafka producer transactions were used and checkpoints were written to a checkpoints topic that could then be recovered. This was taken out as a simplification, and because of complications that would likely arise from the existence of Kafka transaction timeouts, which limit how long transactions can be open. An at-least-once non-transactional mode of operation will be needed no matter what because of this timeout consideration, and so the connector is being initially released with only that capability and we can add transaction support later if there is an actual need for it.

Field selection is a little different for this connector than for example SQL materializations: Here, all top-level fields are recommended by default, so that the default path is for users to end up with their entire document materialized nicely without having to do anything different. It is also possible to de-select top-level fields, or include additional nested fields if that is needed for some reason.

Closes #2138

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

I will write new documentation for the connector along with merging this PR.

Notes for reviewers:

(anything that might help someone review this PR)


This change is Reviewable

An initial working version of the Kafka materialization, that materializes
documents as Avro messages.
Override the default behavior of logging on librdkafka errors as ERROR logs and
log them as debug instead. If there's actually a fatal error the connector will
encounter that through its normal processing and error out with the typical
logging.
Adds an option for messages to be encoded as JSON if a schema registry isn't
available, or just if JSON encoding is preferred.
Adds snapshot tests for the `Spec` command and also some simple verification of
materialized documents.
Adds the dockerfile and CI build steps for materialize-kafka.

I ended up not trying to share code from source-kafka right now since it was
only needed in the tests, and have instead copy/pasted the Avro to JSON
translating logic directly into the materialize-kafka tests file.
The synthetic flow_published_at field is recommended by default in all other
materializations, so we'll recommend it here as well.

The concept of a "synthetic" field is somewhat interesting when considering
validation constraints, but for current practical purposes the only ones to be
concerned with are flow_published_at and flow_document.
It may be helpful to have a little more logged information about checkpoint
recovery, particularly to get an idea of how much compaction is done on
the checkpoints topic over time.
…ints

The higher-level FutureProducer posed some performance challenges, and so the
more low-level ThreadedProducer has been swapped in.

It seems like the intention with the FutureProducer is to create futures for all
of your messages, and then await all of them individually. This is mostly a
memory problem, since we'd have to create a future for every message in a
transaction, or implement some kind of more complicated system of backpressure.

The ThreadedProducer works more directly with the producer queue, and we apply
some simple backpressure via a `poll` call to the producer when that queue is
full. This seems to work well in my own testing, and has a good amount of
throughput.

The checkpoint persistence and recovery code has been ripped out, for two
reasons: First, the ThreadedProducer is a little more complicated to use
(particularly in async handling), and I didn't particularly feel like figuring
out how to use it in the same way that the prior implementation with the
FutureProducer worked. And second, Kafka has a concept of a "transaction
timeout", which is the maximum length of time a transaction can be open. There's
always going to be some timeout, although it could be on the order of 10's of
minutes. But obviously this is problematic for the idea of materializations
needing to handle unbounded transaction sizes, and we were always going to need
some kind of "non transactional" mode of operation that features at-least-once
message delivery. This initial version of the materialization connector will
only have that at-least-once mode, and we can revisit exactly-once later if
there is demand for it.
Connecting to AWK MSK with the rust Kafka library we are using requires the
client to call "poll" so that an auth token is generated. But we need to use an
admin client to create topics during the materialization `Apply`, and there is
no way that I could find to call "poll" with the admin client to generate a
token.

It's possible that I'm missing something here, but I tried pretty hard to make
it work. If it it turns out that anybody wants to use this connector with MSK we
can revisit this later. For now I think it's best to remove MSK as an option,
since it isn't going to work well for most users unless they somehow pre-create
all of the topics the connector needs, which is the only other option I can
think of.
Take the selected key fields as-is, without unescaping ~0 or ~1 when building
the slice of key pointers used to create the Avro schema.

This fixes some inconsistencies with key serialization when key fields contain
actual slashes in the names.
@williamhbaker williamhbaker marked this pull request as ready for review December 11, 2024 21:01
Copy link
Member

@mdibaiee mdibaiee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM in general, just a few questions

@@ -0,0 +1,41 @@
[package]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose at some point we will have a workspace in this repository and the connectors will share their dependencies, like the crates do in flow repository and the go connectors do here, does that sound right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely! I had experimented a little with doing that in this PR but decided to hold off for the time being. There's some re-org/cleanup we could do with the common packages for the Go connectors as well. I'm thinking it'll be a bit of general work for adapting the repo to more of a multi-language format than we have previously assumed.

// The selected key fields are treated as unescaped strings. For
// example, a key field like `some/nested~1key` is taken directly as
// a property named "some/nested~1key" with that pointer rather than
// unescaping the ~1 into a nested pointer of ("some", "nested/key").
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I fully understand the implication of this, what kind of scenario does this cover?

Copy link
Member Author

@williamhbaker williamhbaker Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a document with a structure like this:

{
  "some": {
    "nested/key": "value"
  } 
}

You can select the "nested/key" field, which is nested in the containing "some" object, and Flow provides a flattened alias for that field that in the string "some/nested~1key" - the "nesting" is represented by slashes, and actual slashes in the field names are escaped as ~1.

SQL materializations etc. take that alias directly and create a column for it. We want to do the same thing with Kafka, creating a top-level property if a user explicitly selects a nested field (nested fields are not recommended by default, since the full structure of all top-level fields is materialized by default, so there's not really any reason to select nested fields, but you could if you want to).

The use of Token::from_str is in contrast to using Pointer::from_str more directly since Pointer::from_str would interpret such a string as representing a nested structure rather than just treating it as the desired named of the field.

It's all pretty obscure and not likely to come up very often, but it was an edge case I uncovered in my testing.

Input, Output,
};

pub async fn run_transactions(mut input: Input, mut output: Output, open: Open) -> Result<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine we want to extract this into a library at some point for writing Rust connectors? this seems like materialize-boilerplate, but for Rust

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep for sure. Probably with the next Rust materialization we write, whenever that is, especially if it needs to do standard updates.

materialize-kafka/src/transactor.rs Show resolved Hide resolved
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
tracing::debug!("polling producer since queue is full");
producer.poll(Timeout::Never);
msg = original_message;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this line necessary? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

producer.send takes ownership of its msg argument, so without this line the borrow checker complains that on subsequent iterations of the retry loop msg has been moved. The producer.send error response gives you back the message it took ownership of and this reassignment of that to the original msg lets the next round of the loop use that one (which is the same one, really) rather than the moved variable.

@williamhbaker williamhbaker merged commit e569f42 into main Dec 17, 2024
53 of 54 checks passed
@williamhbaker williamhbaker deleted the wb/materialize-kafka branch December 17, 2024 21:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

materialize-kafka: new materialization with support for Avro and schema registry
2 participants