Skip to content

Commit

Permalink
[cdc] kafka cdc uses hive catalog to support field conversion to lowe…
Browse files Browse the repository at this point in the history
…rcase (apache#2016)
  • Loading branch information
zhuangchong authored Sep 20, 2023
1 parent 27eada1 commit 14cb7b7
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ protected Map<String, String> extractRowData(
Map<String, String> resultMap = new HashMap<>();
linkedHashMap.forEach(
(key, value) -> {
paimonFieldTypes.put(key, DataTypes.STRING());
paimonFieldTypes.put(applyCaseSensitiveFieldName(key), DataTypes.STRING());
resultMap.put(key, value);
});

Expand All @@ -177,7 +177,9 @@ protected Map<String, String> extractRowData(
resultMap.put(
computedColumn.columnName(),
computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
paimonFieldTypes.put(computedColumn.columnName(), computedColumn.columnType());
paimonFieldTypes.put(
applyCaseSensitiveFieldName(computedColumn.columnName()),
computedColumn.columnType());
});

return resultMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected void extractFieldTypesFromDatabaseSchema() {
.forEachRemaining(
fieldName -> {
String fieldType = schema.get(fieldName).asText();
fieldTypes.put(applyCaseSensitiveFieldName(fieldName), fieldType);
fieldTypes.put(fieldName, fieldType);
});
this.fieldTypes = fieldTypes;
}
Expand Down Expand Up @@ -131,7 +131,9 @@ protected LinkedHashMap<String, DataType> setPaimonFieldType() {
LinkedHashMap<String, DataType> paimonFieldTypes = new LinkedHashMap<>();
fieldTypes.forEach(
(name, type) ->
paimonFieldTypes.put(name, MySqlTypeUtils.toDataType(type, typeMapping)));
paimonFieldTypes.put(
applyCaseSensitiveFieldName(name),
MySqlTypeUtils.toDataType(type, typeMapping)));
return paimonFieldTypes;
}

Expand Down Expand Up @@ -168,7 +170,9 @@ protected Map<String, String> extractRowData(
JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes) {
fieldTypes.forEach(
(name, type) ->
paimonFieldTypes.put(name, MySqlTypeUtils.toDataType(type, typeMapping)));
paimonFieldTypes.put(
applyCaseSensitiveFieldName(name),
MySqlTypeUtils.toDataType(type, typeMapping)));
Map<String, Object> jsonMap =
OBJECT_MAPPER.convertValue(record, new TypeReference<Map<String, Object>>() {});
if (jsonMap == null) {
Expand All @@ -191,7 +195,9 @@ protected Map<String, String> extractRowData(
resultMap.put(
computedColumn.columnName(),
computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
paimonFieldTypes.put(computedColumn.columnName(), computedColumn.columnType());
paimonFieldTypes.put(
applyCaseSensitiveFieldName(computedColumn.columnName()),
computedColumn.columnType());
}
return resultMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -570,4 +571,42 @@ public void testCatalogAndTableConfig() {
assertThat(action.tableConfig())
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}

@Test
@Timeout(60)
public void testCaseInsensitive() throws Exception {
final String topic = "case-insensitive";
createTestTopic(topic, 1, 1);

// ---------- Write the Canal json into Kafka -------------------
writeRecordsToKafka(
topic, readLines("kafka/canal/database/case-insensitive/canal-data-1.txt"));

Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put("value.format", "canal-json");
kafkaConfig.put("topic", topic);

KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
CatalogOptions.METASTORE.key(), "test-case-insensitive"))
.build();
runActionWithDefaultEnv(action);

waitingTables("t1");
FileStoreTable table = getFileStoreTable("t1");
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT()
},
new String[] {"k1", "v0", "v1"});
waitForResult(
Arrays.asList("+I[5, five, 50]", "+I[7, seven, 70]"),
table,
rowType,
Collections.singletonList("k1"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -462,4 +463,46 @@ private void includingAndExcludingTablesImpl(
waitingTables(existedTables);
assertTableNotExists(notExistedTables);
}

@Test
@Timeout(60)
public void testCaseInsensitive() throws Exception {
final String topic = "case-insensitive";
createTestTopic(topic, 1, 1);

// ---------- Write the ogg json into Kafka -------------------

writeRecordsToKafka(topic, readLines("kafka/ogg/database/case-insensitive/ogg-data-1.txt"));

Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put("value.format", "ogg-json");
kafkaConfig.put("topic", topic);

KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
CatalogOptions.METASTORE.key(), "test-case-insensitive"))
.build();
runActionWithDefaultEnv(action);

waitingTables("t1");
FileStoreTable table = getFileStoreTable("t1");
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.STRING().notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING()
},
new String[] {"id", "name", "description", "weight"});
List<String> primaryKeys1 = Collections.singletonList("id");
List<String> expected =
Arrays.asList(
"+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]",
"+I[102, car battery, 12V car battery, 8.100000381469727]");
waitForResult(expected, table, rowType, primaryKeys1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

package org.apache.paimon.flink.action.cdc.mongodb;

import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.mysql.TestCaseInsensitiveCatalogFactory;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
Expand All @@ -30,7 +27,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -187,21 +183,17 @@ public void testMongoDBNestedDataSynchronizationAndVerification() throws Excepti
@Test
@Timeout(60)
public void testDynamicTableCreationInMongoDB() throws Exception {
catalog =
new TestCaseInsensitiveCatalogFactory()
.createCatalog(CatalogContext.create(new Path(warehouse)));
String dbName = database + UUID.randomUUID();
writeRecordsToMongoDB("test-data-5", dbName, "database");
Map<String, String> mongodbConfig = getBasicMongoDBConfig();
mongodbConfig.put("database", dbName);
MongoDBSyncDatabaseAction action =
syncDatabaseActionBuilder(mongodbConfig)
.withTableConfig(getBasicTableConfig())
.withCatalogConfig(
Collections.singletonMap(
CatalogOptions.METASTORE.key(), "test-case-insensitive"))
.build();
Field catalogField = ActionBase.class.getDeclaredField("catalog");
catalogField.setAccessible(true);
Object newCatalog = catalog;
catalogField.set(action, newCatalog);
runActionWithDefaultEnv(action);

waitingTables("t3");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{"data":[{"k1":"5","v0":"five","V1":"50"}],"database":"paimon_sync_database_affix","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","V1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770072286,"type":"INSERT"}
{"data":[{"K1":"7","v0":"seven","V1":"70"}],"database":"paimon_sync_database_affix","es":1684770073000,"id":84,"isDdl":false,"mysqlType":{"K1":"INT","v0":"VARCHAR(10)","V1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770073254,"type":"INSERT"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


{"table":"PAIMON_SYNC_DATABASE.T1","pos":"00000000000000000000143","primary_keys":["ID"],"after":{"ID":101,"NAME":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"op_type":"I", "current_ts":"2020-05-13T13:39:35.766000", "op_ts":"2020-05-13 15:40:06.000000"}
{"table":"PAIMON_SYNC_DATABASE.T1","pos":"00000000000000000000144","primary_keys":["id"],"after":{"ID":102,"name":"car battery","description":"12V car battery","WEIGHT":8.100000381469727},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.StringUtils.caseSensitiveConversion;

/** Schema builder for {@link RichCdcMultiplexRecord}. */
public class RichCdcMultiplexRecordSchemaBuilder
Expand All @@ -44,12 +48,16 @@ public Optional<Schema> build(RichCdcMultiplexRecord record) {

for (Map.Entry<String, DataType> entry : record.fieldTypes().entrySet()) {
builder.column(
caseSensitive ? entry.getKey() : entry.getKey().toLowerCase(),
entry.getValue(),
null);
caseSensitiveConversion(entry.getKey(), caseSensitive), entry.getValue(), null);
}

Schema schema = builder.primaryKey(record.primaryKeys()).build();
List<String> primaryKeys =
caseSensitive
? record.primaryKeys()
: record.primaryKeys().stream()
.map(String::toLowerCase)
.collect(Collectors.toList());
Schema schema = builder.primaryKey(primaryKeys).build();

return Optional.of(schema);
}
Expand Down

0 comments on commit 14cb7b7

Please sign in to comment.