Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1890] Offset ranges allow multiple formats GMIP #3753

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,18 +267,23 @@ private boolean createTable(HiveSpec tableSpec, String tableKey) {
}

@Nullable
private String getTopicName(GobblinMetadataChangeEvent gmce) {
protected String getTopicName(GobblinMetadataChangeEvent gmce) {
//Calculate the topic name from gmce, fall back to topic.name in hive spec which can also be null
//todo: make topicName fall back to topic.name in hive spec so that we can also get schema for re-write operation
String topicName = null;
if (gmce.getTopicPartitionOffsetsRange() != null && !gmce.getTopicPartitionOffsetsRange().isEmpty()) {
// In case the topic name is not the table name or the topic name contains '-'
String topicPartitionString = gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
//In case the topic name is not the table name or the topic name contains '-'
topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
topicName = parseTopicNameFromOffsetRangeKey(topicPartitionString);
}
return topicName;
}

public static String parseTopicNameFromOffsetRangeKey(String offsetRangeKey) {
int startOfTopicName = offsetRangeKey.lastIndexOf('.') + 1;
return offsetRangeKey.substring(startOfTopicName, offsetRangeKey.lastIndexOf('-'));
}

/**
* We care about if a table key is in the spec cache because it means that we have already created this table before
* since the last flush. Therefore, we can use this method to check whether we need to create a table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import org.apache.gobblin.hive.HivePartition;
import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.hive.writer.HiveMetadataWriter;
import org.apache.gobblin.hive.writer.MetadataWriter;
import org.apache.gobblin.hive.writer.MetadataWriterKeys;
import org.apache.gobblin.iceberg.Utils.IcebergUtils;
Expand Down Expand Up @@ -813,7 +814,7 @@ protected String getTopicName(TableIdentifier tid, TableMetadata tableMetadata)
if (tableMetadata.dataOffsetRange.isPresent() && tableMetadata.dataOffsetRange.get().size() != 0) {
String topicPartitionString = tableMetadata.dataOffsetRange.get().keySet().iterator().next();
//In case the topic name is not the table name or the topic name contains '-'
return topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
return HiveMetadataWriter.parseTopicNameFromOffsetRangeKey(topicPartitionString);
}
return tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties()))).get(TOPIC_NAME_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class HiveMetadataWriterTest extends HiveMetastoreTest {

private GobblinMCEWriter gobblinMCEWriter;

GobblinMetadataChangeEvent.Builder gmceBuilder;
GobblinMetadataChangeEvent gmce;
static File tmpDir;
static File dataDir;
Expand Down Expand Up @@ -163,7 +164,7 @@ public void setUp() throws Exception {
writeRecord(dailyDataFile);
Map<String, String> registrationState = new HashMap();
registrationState.put("hive.database.name", dbName);
gmce = GobblinMetadataChangeEvent.newBuilder()
gmceBuilder = GobblinMetadataChangeEvent.newBuilder()
.setDatasetIdentifier(DatasetIdentifier.newBuilder()
.setDataOrigin(DataOrigin.EI)
.setDataPlatformUrn("urn:namespace:dataPlatform:hdfs")
Expand All @@ -183,13 +184,14 @@ public void setUp() throws Exception {
.setPartitionColumns(Lists.newArrayList("testpartition"))
.setRegistrationPolicy(TestHiveRegistrationPolicy.class.getName())
.setRegistrationProperties(registrationState)
.setAllowedMetadataWriters(Collections.singletonList(HiveMetadataWriter.class.getName()))
.build();
.setAllowedMetadataWriters(Collections.singletonList(TestHiveMetadataWriter.class.getName()));
gmce = gmceBuilder.build();

state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS,
KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
state.setProp("default.hive.registration.policy",
TestHiveRegistrationPolicy.class.getName());
state.setProp("gmce.metadata.writer.classes", "org.apache.gobblin.hive.writer.HiveMetadataWriter");
state.setProp("gmce.metadata.writer.classes", TestHiveMetadataWriter.class.getName());
gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
}

Expand Down Expand Up @@ -401,6 +403,21 @@ public void testUpdateLatestSchemaWithExistingSchema() throws IOException {
Assert.assertThrows(IllegalStateException.class, () -> updateLatestSchema.apply(nameOfTableThatHasNoSchemaLiteral));
}

@Test
public void testGetTopicName() {
final String expectedTopicName = "123-topic-Name-123_v2";
Function<String, GobblinMetadataChangeEvent> getGmce = (offsetRangeKey) -> {
Map<String, String> offsetRangeMap = new HashMap<>();
offsetRangeMap.put(String.format(offsetRangeKey, expectedTopicName), "0-100");
return GobblinMetadataChangeEvent.newBuilder(gmceBuilder).setTopicPartitionOffsetsRange(offsetRangeMap).build();
};

TestHiveMetadataWriter hiveWriter = (TestHiveMetadataWriter) gobblinMCEWriter.getMetadataWriters().get(0);
Assert.assertEquals(hiveWriter.getTopicName(getGmce.apply("%s-0")), expectedTopicName);
Assert.assertEquals(hiveWriter.getTopicName(getGmce.apply("kafkaIdentifier.%s-0")), expectedTopicName);
Assert.assertEquals(hiveWriter.getTopicName(getGmce.apply("kafkaIdentifier.foobar.%s-0")), expectedTopicName);
}

private String writeRecord(File file) throws IOException {
GenericData.Record record = new GenericData.Record(avroDataSchema);
record.put("id", 1L);
Expand All @@ -427,6 +444,10 @@ public TestHiveMetadataWriter(State state) throws IOException {
super(state);
}

public String getTopicName(GobblinMetadataChangeEvent gmce) {
return super.getTopicName(gmce);
}

public static boolean updateLatestSchemaMapWithExistingSchema(
String dbName,
String tableName,
Expand Down
Loading