Skip to content

Commit

Permalink
API, Core: Metadata Row Lineage (#11948)
Browse files Browse the repository at this point in the history
  • Loading branch information
RussellSpitzer authored Feb 1, 2025
1 parent cae7d1b commit 1fcd6a9
Show file tree
Hide file tree
Showing 17 changed files with 801 additions and 46 deletions.
25 changes: 25 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,29 @@ default Iterable<DeleteFile> removedDeleteFiles(FileIO io) {
default Integer schemaId() {
return null;
}

/**
* The row-id of the first newly added row in this snapshot. All rows added in this snapshot will
* have a row-id assigned to them greater than this value. All rows with a row-id less than this
* value were created in a snapshot that was added to the table (but not necessarily commited to
* this branch) in the past.
*
* @return the first row-id to be used in this snapshot or null if row lineage was not enabled
* when the table was created.
*/
default Long firstRowId() {
return null;
}

/**
* The total number of newly added rows in this snapshot. It should be the summation of {@link
* ManifestFile#ADDED_ROWS_COUNT} for every manifest added in this snapshot.
*
* <p>This field is optional but is required when row lineage is enabled.
*
* @return the total number of new rows in this snapshot or null if the value was not stored.
*/
default Long addedRows() {
return null;
}
}
20 changes: 19 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class BaseSnapshot implements Snapshot {
private final Map<String, String> summary;
private final Integer schemaId;
private final String[] v1ManifestLocations;
private final Long firstRowId;
private final Long addedRows;

// lazily initialized
private transient List<ManifestFile> allManifests = null;
Expand All @@ -61,7 +63,9 @@ class BaseSnapshot implements Snapshot {
String operation,
Map<String, String> summary,
Integer schemaId,
String manifestList) {
String manifestList,
Long firstRowId,
Long addedRows) {
this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
this.parentId = parentId;
Expand All @@ -71,6 +75,8 @@ class BaseSnapshot implements Snapshot {
this.schemaId = schemaId;
this.manifestListLocation = manifestList;
this.v1ManifestLocations = null;
this.firstRowId = firstRowId;
this.addedRows = addedRows;
}

BaseSnapshot(
Expand All @@ -91,6 +97,8 @@ class BaseSnapshot implements Snapshot {
this.schemaId = schemaId;
this.manifestListLocation = null;
this.v1ManifestLocations = v1ManifestLocations;
this.firstRowId = null;
this.addedRows = null;
}

@Override
Expand Down Expand Up @@ -128,6 +136,16 @@ public Integer schemaId() {
return schemaId;
}

@Override
public Long firstRowId() {
return firstRowId;
}

@Override
public Long addedRows() {
return addedRows;
}

private void cacheManifests(FileIO fileIO) {
if (fileIO == null) {
throw new IllegalArgumentException("Cannot cache changes: FileIO is null");
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -526,4 +526,11 @@ public void applyTo(ViewMetadata.Builder viewMetadataBuilder) {
viewMetadataBuilder.setCurrentVersionId(versionId);
}
}

class EnableRowLineage implements MetadataUpdate {
@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.enableRowLineage();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private MetadataUpdateParser() {}
static final String SET_PARTITION_STATISTICS = "set-partition-statistics";
static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics";
static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";
static final String ENABLE_ROW_LINEAGE = "enable-row-lineage";

// AssignUUID
private static final String UUID = "uuid";
Expand Down Expand Up @@ -154,6 +155,7 @@ private MetadataUpdateParser() {}
.put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
.put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS)
.put(MetadataUpdate.EnableRowLineage.class, ENABLE_ROW_LINEAGE)
.buildOrThrow();

public static String toJson(MetadataUpdate metadataUpdate) {
Expand Down Expand Up @@ -249,6 +251,8 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
case REMOVE_PARTITION_SPECS:
writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator);
break;
case ENABLE_ROW_LINEAGE:
break;
default:
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -322,6 +326,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
return readCurrentViewVersionId(jsonNode);
case REMOVE_PARTITION_SPECS:
return readRemovePartitionSpecs(jsonNode);
case ENABLE_ROW_LINEAGE:
return new MetadataUpdate.EnableRowLineage();
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s", action));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ public static TableMetadata replacePaths(
// TODO: update statistic file paths
metadata.statisticsFiles(),
metadata.partitionStatisticsFiles(),
metadata.changes());
metadata.changes(),
metadata.rowLineageEnabled(),
metadata.nextRowId());
}

private static Map<String, String> updateProperties(
Expand Down Expand Up @@ -188,7 +190,9 @@ private static List<Snapshot> updatePathInSnapshots(
snapshot.operation(),
snapshot.summary(),
snapshot.schemaId(),
newManifestListLocation);
newManifestListLocation,
snapshot.firstRowId(),
snapshot.addedRows());
newSnapshots.add(newSnapshot);
}
return newSnapshots;
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ private SnapshotParser() {}
private static final String MANIFESTS = "manifests";
private static final String MANIFEST_LIST = "manifest-list";
private static final String SCHEMA_ID = "schema-id";
private static final String FIRST_ROW_ID = "first-row-id";
private static final String ADDED_ROWS = "added-rows";

static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException {
generator.writeStartObject();
Expand Down Expand Up @@ -96,6 +98,14 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOExceptio
generator.writeNumberField(SCHEMA_ID, snapshot.schemaId());
}

if (snapshot.firstRowId() != null) {
generator.writeNumberField(FIRST_ROW_ID, snapshot.firstRowId());
}

if (snapshot.addedRows() != null) {
generator.writeNumberField(ADDED_ROWS, snapshot.addedRows());
}

generator.writeEndObject();
}

Expand Down Expand Up @@ -158,6 +168,9 @@ static Snapshot fromJson(JsonNode node) {

Integer schemaId = JsonUtil.getIntOrNull(SCHEMA_ID, node);

Long firstRowId = JsonUtil.getLongOrNull(FIRST_ROW_ID, node);
Long addedRows = JsonUtil.getLongOrNull(ADDED_ROWS, node);

if (node.has(MANIFEST_LIST)) {
// the manifest list is stored in a manifest list file
String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
Expand All @@ -169,7 +182,9 @@ static Snapshot fromJson(JsonNode node) {
operation,
summary,
schemaId,
manifestList);
manifestList,
firstRowId,
addedRows);

} else {
// fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
Expand Down
30 changes: 29 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -282,6 +283,13 @@ public Snapshot apply() {
throw new RuntimeIOException(e, "Failed to write manifest list file");
}

Long addedRows = null;
Long lastRowId = null;
if (base.rowLineageEnabled()) {
addedRows = calculateAddedRows(manifests);
lastRowId = base.nextRowId();
}

return new BaseSnapshot(
sequenceNumber,
snapshotId(),
Expand All @@ -290,7 +298,27 @@ public Snapshot apply() {
operation(),
summary(base),
base.currentSchemaId(),
manifestList.location());
manifestList.location(),
lastRowId,
addedRows);
}

private Long calculateAddedRows(List<ManifestFile> manifests) {
return manifests.stream()
.filter(
manifest ->
manifest.snapshotId() == null
|| Objects.equals(manifest.snapshotId(), this.snapshotId))
.mapToLong(
manifest -> {
Preconditions.checkArgument(
manifest.addedRowsCount() != null,
"Cannot determine number of added rows in snapshot because"
+ " the entry for manifest %s is missing the field `added-rows-count`",
manifest.path());
return manifest.addedRowsCount();
})
.sum();
}

protected abstract Map<String, String> summary();
Expand Down
Loading

0 comments on commit 1fcd6a9

Please sign in to comment.