Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8325 Emit DDL events #210

Closed
wants to merge 13 commits into from
Closed

DBZ-8325 Emit DDL events #210

wants to merge 13 commits into from

Conversation

twthorn
Copy link
Contributor

@twthorn twthorn commented Oct 15, 2024

Add support to emit DDL events.

The only way to emit DDL events currently is with historized connectors/configs/schemas because of this check.

I thought about using the binlog schema/connector but from what I can tell this is not appropriate since we are not actually interacting directly with binlogs (we get protobuf grpc events from Vitess). Although there is overlap, it seemed too much to modify that to fit our use case, so I opted for more general historized versions.

There is no need for internal use of the schema history as we receive a table map (ie field) event before each change. All we really want is a way to be able to publish DDL events for external downstream users to consume (so they can stay up to date on changes that aren't simply DMLs, ie DDLs that change the rows eg truncate tables, drop partition). If there's a simpler way to do this then that would be preferred.

From what I can tell to leverage the existing DDL parsers for MySQL we do need to have the in-memory representation of the tables be up to date otherwise parsing does not generate DDL events. ie on startup we need to do a schema snapshot or have some way of finding the schema of the tables, otherwise it's a race condition with receiving a row change (that includes a table map event) and receiving a DDL event (if DDL received before the row change/table map, it can't be parsed, if received after then it can be parsed). So this seems it does require us to have a full historized schema/connector/config support.

Note: we will refactor the debezium-connector-mysql/binlog so we don't need these, we can move the parsing to another module to be shared. This is an initial PR to get early feedback.

Note: the itests trivially fail here since we made a modification to debezium main repo dependencies. The PR neede for these changes to work is here debezium/debezium#5939. It removes the converters as suggested in this thread

@twthorn
Copy link
Contributor Author

twthorn commented Oct 15, 2024

There are two integration tests failing due to some issues with parsing 0000-00-00 dates, I will look into these, the rest are passing with these changes.

@twthorn
Copy link
Contributor Author

twthorn commented Oct 16, 2024

All integration tests pass locally with the changes in debezium/debezium#5939

Copy link
Contributor

@HenryCaiHaiying HenryCaiHaiying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments.


@Override
public String schemaChangeTopic() {
String topicName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem you are using topicName in the method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, updated!

return overrideSchemaChangeTopicName;
}
else {
return prefix;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the value of prefix in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, updated java doc!

Copy link
Contributor

@HenryCaiHaiying HenryCaiHaiying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Copy link
Contributor

@jpechane jpechane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@twthorn I created the 3.1 branch so the PR could be redirected to be merged to it.
The PR looks functional but I am afraid I am not really happy about the idea of using database history just to bypass core checks or hook into the code.
I if possible would definitely prefer the changes to the core that would allow the connector without historized schema to publish its schema changes too (PostgreSQL would benefit from it for example). A CommonConnectorConfig could be for example used for that.

@@ -368,6 +433,14 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.withValidation(CommonConnectorConfig::validateTopicName)
.withDescription("Overrides the topic.prefix used for the data change topic.");

public static final Field OVERRIDE_SCHEMA_CHANGE_TOPIC = Field.create("override.schema.change.topic")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this dependency is not needed as it is a transitive one for debezium-connector-mysql

@twthorn
Copy link
Contributor Author

twthorn commented Nov 7, 2024

@jpechane Thank you for the review.

I will work on:

  1. Create a PR in debezium/debezium-core - add a config for allowing connectors to publish schema change events, even if not historized schemas
  2. Create a new PR for debezium-connector-vitess - publish simple schema change events without parsing DDLs ie empty tableChanges field (note: we cannot use the mysql parser because it requires the in-memory representation of each table be pre-existing from a snapshot/CREATE table or consuming from the schema history topic, and we will not be supporting these as they are unnecessary). Here is an example event:
SourceRecord{sourcePartition={server=test_server}, sourceOffset={vgtid=[{"keyspace":"test_unsharded_keyspace","shard":"0","gtid":"MySQL56/3260b633-9c9a-11ef-b302-0242ac110002:1-228","table_p_ks":[]}]}} ConnectRecord{topic='test_server', kafkaPartition=0, key=Struct{databaseName=test_unsharded_keyspace}, keySchema=Schema{io.debezium.connector.vitess.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=3.0.0-SNAPSHOT,connector=vitess,name=test_server,ts_ms=1730937407000,db=,ts_us=1730937407000000,ts_ns=1730937407000000000,keyspace=test_unsharded_keyspace,table=ddl_table,shard=0,vgtid=[{"keyspace":"test_unsharded_keyspace","shard":"0","gtid":"MySQL56/3260b633-9c9a-11ef-b302-0242ac110002:1-229","table_p_ks":[]}]},ts_ms=1730937407400,databaseName=test_unsharded_keyspace,ddl=alter table ddl_table add column new_column_name INT,tableChanges=[]}, valueSchema=Schema{io.debezium.connector.vitess.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

As a follow up change (separate than either of these to keep things smaller scoped) we could create another form of the parser that doesn't rely on schema history.

@twthorn
Copy link
Contributor Author

twthorn commented Nov 8, 2024

Closing in favor of
debezium/debezium#5979
#215

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants