From fec92ea5fc8023f5c74929f5ed4bffc66a66cdfb Mon Sep 17 00:00:00 2001 From: Matthew Ho Date: Mon, 28 Aug 2023 17:52:13 -0700 Subject: [PATCH 1/2] [GOBBLIN-1890] Offset ranges allow multiple formats GMIP --- .../hive/writer/HiveMetadataWriter.java | 5 ++-- .../writer/HiveMetadataWriterTest.java | 29 ++++++++++++++++--- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java index e5ac2741c33..98b371331ef 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java @@ -267,14 +267,15 @@ private boolean createTable(HiveSpec tableSpec, String tableKey) { } @Nullable - private String getTopicName(GobblinMetadataChangeEvent gmce) { + protected String getTopicName(GobblinMetadataChangeEvent gmce) { //Calculate the topic name from gmce, fall back to topic.name in hive spec which can also be null //todo: make topicName fall back to topic.name in hive spec so that we can also get schema for re-write operation String topicName = null; if (gmce.getTopicPartitionOffsetsRange() != null && !gmce.getTopicPartitionOffsetsRange().isEmpty()) { String topicPartitionString = gmce.getTopicPartitionOffsetsRange().keySet().iterator().next(); //In case the topic name is not the table name or the topic name contains '-' - topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-')); + int startOfTopicName = topicPartitionString.lastIndexOf('.') + 1; + topicName = topicPartitionString.substring(startOfTopicName, topicPartitionString.lastIndexOf('-')); } return topicName; } diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java index 2797bdf4960..d442b942f24 100644 --- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java +++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java @@ -109,6 +109,7 @@ public class HiveMetadataWriterTest extends HiveMetastoreTest { private GobblinMCEWriter gobblinMCEWriter; + GobblinMetadataChangeEvent.Builder gmceBuilder; GobblinMetadataChangeEvent gmce; static File tmpDir; static File dataDir; @@ -163,7 +164,7 @@ public void setUp() throws Exception { writeRecord(dailyDataFile); Map registrationState = new HashMap(); registrationState.put("hive.database.name", dbName); - gmce = GobblinMetadataChangeEvent.newBuilder() + gmceBuilder = GobblinMetadataChangeEvent.newBuilder() .setDatasetIdentifier(DatasetIdentifier.newBuilder() .setDataOrigin(DataOrigin.EI) .setDataPlatformUrn("urn:namespace:dataPlatform:hdfs") @@ -183,13 +184,14 @@ public void setUp() throws Exception { .setPartitionColumns(Lists.newArrayList("testpartition")) .setRegistrationPolicy(TestHiveRegistrationPolicy.class.getName()) .setRegistrationProperties(registrationState) - .setAllowedMetadataWriters(Collections.singletonList(HiveMetadataWriter.class.getName())) - .build(); + .setAllowedMetadataWriters(Collections.singletonList(TestHiveMetadataWriter.class.getName())); + gmce = gmceBuilder.build(); + state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS, KafkaStreamTestUtils.MockSchemaRegistry.class.getName()); state.setProp("default.hive.registration.policy", TestHiveRegistrationPolicy.class.getName()); - state.setProp("gmce.metadata.writer.classes", "org.apache.gobblin.hive.writer.HiveMetadataWriter"); + state.setProp("gmce.metadata.writer.classes", TestHiveMetadataWriter.class.getName()); gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state); } @@ -401,6 +403,21 @@ public void testUpdateLatestSchemaWithExistingSchema() throws IOException { Assert.assertThrows(IllegalStateException.class, () -> updateLatestSchema.apply(nameOfTableThatHasNoSchemaLiteral)); } + @Test + public void testGetTopicName() { + final String expectedTopicName = "123-topic-Name-123_v2"; + Function getGmce = (offsetRangeKey) -> { + Map offsetRangeMap = new HashMap<>(); + offsetRangeMap.put(String.format(offsetRangeKey, expectedTopicName), "0-100"); + return GobblinMetadataChangeEvent.newBuilder(gmceBuilder).setTopicPartitionOffsetsRange(offsetRangeMap).build(); + }; + + TestHiveMetadataWriter hiveWriter = (TestHiveMetadataWriter) gobblinMCEWriter.getMetadataWriters().get(0); + Assert.assertEquals(hiveWriter.getTopicName(getGmce.apply("%s-0")), expectedTopicName); + Assert.assertEquals(hiveWriter.getTopicName(getGmce.apply("kafkaIdentifier.%s-0")), expectedTopicName); + Assert.assertEquals(hiveWriter.getTopicName(getGmce.apply("kafkaIdentifier.foobar.%s-0")), expectedTopicName); + } + private String writeRecord(File file) throws IOException { GenericData.Record record = new GenericData.Record(avroDataSchema); record.put("id", 1L); @@ -427,6 +444,10 @@ public TestHiveMetadataWriter(State state) throws IOException { super(state); } + public String getTopicName(GobblinMetadataChangeEvent gmce) { + return super.getTopicName(gmce); + } + public static boolean updateLatestSchemaMapWithExistingSchema( String dbName, String tableName, From 49261bc65030f866fceed0ff2b28832d0609d4c9 Mon Sep 17 00:00:00 2001 From: Matthew Ho Date: Tue, 29 Aug 2023 00:24:24 -0700 Subject: [PATCH 2/2] Mirror the change in IcebergMetadataWriter --- .../apache/gobblin/hive/writer/HiveMetadataWriter.java | 10 +++++++--- .../gobblin/iceberg/writer/IcebergMetadataWriter.java | 3 ++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java index 98b371331ef..d86c0c947c0 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java @@ -272,14 +272,18 @@ protected String getTopicName(GobblinMetadataChangeEvent gmce) { //todo: make topicName fall back to topic.name in hive spec so that we can also get schema for re-write operation String topicName = null; if (gmce.getTopicPartitionOffsetsRange() != null && !gmce.getTopicPartitionOffsetsRange().isEmpty()) { + // In case the topic name is not the table name or the topic name contains '-' String topicPartitionString = gmce.getTopicPartitionOffsetsRange().keySet().iterator().next(); - //In case the topic name is not the table name or the topic name contains '-' - int startOfTopicName = topicPartitionString.lastIndexOf('.') + 1; - topicName = topicPartitionString.substring(startOfTopicName, topicPartitionString.lastIndexOf('-')); + topicName = parseTopicNameFromOffsetRangeKey(topicPartitionString); } return topicName; } + public static String parseTopicNameFromOffsetRangeKey(String offsetRangeKey) { + int startOfTopicName = offsetRangeKey.lastIndexOf('.') + 1; + return offsetRangeKey.substring(startOfTopicName, offsetRangeKey.lastIndexOf('-')); + } + /** * We care about if a table key is in the spec cache because it means that we have already created this table before * since the last flush. Therefore, we can use this method to check whether we need to create a table diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java index d614e25240c..713378b99af 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java @@ -107,6 +107,7 @@ import org.apache.gobblin.hive.HivePartition; import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils; import org.apache.gobblin.hive.spec.HiveSpec; +import org.apache.gobblin.hive.writer.HiveMetadataWriter; import org.apache.gobblin.hive.writer.MetadataWriter; import org.apache.gobblin.hive.writer.MetadataWriterKeys; import org.apache.gobblin.iceberg.Utils.IcebergUtils; @@ -813,7 +814,7 @@ protected String getTopicName(TableIdentifier tid, TableMetadata tableMetadata) if (tableMetadata.dataOffsetRange.isPresent() && tableMetadata.dataOffsetRange.get().size() != 0) { String topicPartitionString = tableMetadata.dataOffsetRange.get().keySet().iterator().next(); //In case the topic name is not the table name or the topic name contains '-' - return topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-')); + return HiveMetadataWriter.parseTopicNameFromOffsetRangeKey(topicPartitionString); } return tableMetadata.newProperties.or( Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties()))).get(TOPIC_NAME_KEY);