Skip to content

Commit

Permalink
Core: Change RemoveSnapshots to remove unused schemas
Browse files Browse the repository at this point in the history
Similarly to removing partition specs, this PR improves RemoveSnapshots
to also remove unused schemas.
  • Loading branch information
gaborkaszab committed Feb 12, 2025
1 parent 3e6da2e commit 60744fa
Show file tree
Hide file tree
Showing 11 changed files with 418 additions and 62 deletions.
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,23 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
}
}

class RemoveSchemas implements MetadataUpdate {
private final Set<Integer> schemaIds;

public RemoveSchemas(Set<Integer> schemaIds) {
this.schemaIds = schemaIds;
}

public Set<Integer> schemaIds() {
return schemaIds;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.removeSchemas(schemaIds);
}
}

class AddSortOrder implements MetadataUpdate {
private final UnboundSortOrder sortOrder;

Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private MetadataUpdateParser() {}
static final String SET_PARTITION_STATISTICS = "set-partition-statistics";
static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics";
static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";
static final String REMOVE_SCHEMAS = "remove-schemas";
static final String ENABLE_ROW_LINEAGE = "enable-row-lineage";

// AssignUUID
Expand Down Expand Up @@ -131,6 +132,9 @@ private MetadataUpdateParser() {}
// RemovePartitionSpecs
private static final String SPEC_IDS = "spec-ids";

// RemoveSchemas
private static final String SCHEMA_IDS = "schema-ids";

private static final Map<Class<? extends MetadataUpdate>, String> ACTIONS =
ImmutableMap.<Class<? extends MetadataUpdate>, String>builder()
.put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID)
Expand All @@ -155,6 +159,7 @@ private MetadataUpdateParser() {}
.put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
.put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS)
.put(MetadataUpdate.RemoveSchemas.class, REMOVE_SCHEMAS)
.put(MetadataUpdate.EnableRowLineage.class, ENABLE_ROW_LINEAGE)
.buildOrThrow();

Expand Down Expand Up @@ -251,6 +256,9 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
case REMOVE_PARTITION_SPECS:
writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator);
break;
case REMOVE_SCHEMAS:
writeRemoveSchemas((MetadataUpdate.RemoveSchemas) metadataUpdate, generator);
break;
case ENABLE_ROW_LINEAGE:
break;
default:
Expand Down Expand Up @@ -326,6 +334,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
return readCurrentViewVersionId(jsonNode);
case REMOVE_PARTITION_SPECS:
return readRemovePartitionSpecs(jsonNode);
case REMOVE_SCHEMAS:
return readRemoveSchemas(jsonNode);
case ENABLE_ROW_LINEAGE:
return new MetadataUpdate.EnableRowLineage();
default:
Expand Down Expand Up @@ -468,6 +478,11 @@ private static void writeRemovePartitionSpecs(
JsonUtil.writeIntegerArray(SPEC_IDS, metadataUpdate.specIds(), gen);
}

private static void writeRemoveSchemas(
MetadataUpdate.RemoveSchemas metadataUpdate, JsonGenerator gen) throws IOException {
JsonUtil.writeIntegerArray(SCHEMA_IDS, metadataUpdate.schemaIds(), gen);
}

private static MetadataUpdate readAssignUUID(JsonNode node) {
String uuid = JsonUtil.getString(UUID, node);
return new MetadataUpdate.AssignUUID(uuid);
Expand Down Expand Up @@ -620,4 +635,8 @@ private static MetadataUpdate readCurrentViewVersionId(JsonNode node) {
private static MetadataUpdate readRemovePartitionSpecs(JsonNode node) {
return new MetadataUpdate.RemovePartitionSpecs(JsonUtil.getIntegerSet(SPEC_IDS, node));
}

private static MetadataUpdate readRemoveSchemas(JsonNode node) {
return new MetadataUpdate.RemoveSchemas(JsonUtil.getIntegerSet(SCHEMA_IDS, node));
}
}
22 changes: 17 additions & 5 deletions core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,23 +218,35 @@ private TableMetadata internalApply() {
updatedMetaBuilder.removeSnapshots(idsToRemove);

if (cleanExpiredMetadata) {
// TODO: Support cleaning expired schema as well.
Set<Integer> reachableSpecs = Sets.newConcurrentHashSet();
reachableSpecs.add(base.defaultSpecId());
Set<Integer> reachableSchemas = Sets.newConcurrentHashSet();
reachableSchemas.add(base.currentSchemaId());

Tasks.foreach(idsToRetain)
.executeWith(planExecutorService)
.run(
snapshot ->
base.snapshot(snapshot).allManifests(ops.io()).stream()
.map(ManifestFile::partitionSpecId)
.forEach(reachableSpecs::add));
snapshotId -> {
Snapshot snapshot = base.snapshot(snapshotId);
snapshot.allManifests(ops.io()).stream()
.map(ManifestFile::partitionSpecId)
.forEach(reachableSpecs::add);
reachableSchemas.add(snapshot.schemaId());
});

Set<Integer> specsToRemove =
base.specs().stream()
.map(PartitionSpec::specId)
.filter(specId -> !reachableSpecs.contains(specId))
.collect(Collectors.toSet());
updatedMetaBuilder.removeSpecs(specsToRemove);

Set<Integer> schemasToRemove =
base.schemas().stream()
.map(Schema::schemaId)
.filter(schemaId -> !reachableSchemas.contains(schemaId))
.collect(Collectors.toSet());
updatedMetaBuilder.removeSchemas(schemasToRemove);
}

return updatedMetaBuilder.build();
Expand Down
19 changes: 18 additions & 1 deletion core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ public static class Builder {
private long lastSequenceNumber;
private int lastColumnId;
private int currentSchemaId;
private final List<Schema> schemas;
private List<Schema> schemas;
private int defaultSpecId;
private List<PartitionSpec> specs;
private int lastAssignedPartitionId;
Expand Down Expand Up @@ -1183,6 +1183,23 @@ Builder removeSpecs(Iterable<Integer> specIds) {
.filter(s -> !specIdsToRemove.contains(s.specId()))
.collect(Collectors.toList());
changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove));

return this;
}

Builder removeSchemas(Iterable<Integer> schemaIds) {
Set<Integer> schemaIdsToRemove = Sets.newHashSet(schemaIds);
Preconditions.checkArgument(
!schemaIdsToRemove.contains(currentSchemaId), "Cannot remove the current schema");

if (!schemaIdsToRemove.isEmpty()) {
this.schemas =
schemas.stream()
.filter(s -> !schemaIdsToRemove.contains(s.schemaId()))
.collect(Collectors.toList());
changes.add(new MetadataUpdate.RemoveSchemas(schemaIdsToRemove));
}

return this;
}

Expand Down
45 changes: 29 additions & 16 deletions core/src/main/java/org/apache/iceberg/UpdateRequirements.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ private Builder update(MetadataUpdate update) {
update((MetadataUpdate.SetDefaultSortOrder) update);
} else if (update instanceof MetadataUpdate.RemovePartitionSpecs) {
update((MetadataUpdate.RemovePartitionSpecs) update);
} else if (update instanceof MetadataUpdate.RemoveSchemas) {
update((MetadataUpdate.RemoveSchemas) update);
}

return this;
Expand Down Expand Up @@ -136,13 +138,7 @@ private void update(MetadataUpdate.AddSchema unused) {
}

private void update(MetadataUpdate.SetCurrentSchema unused) {
if (!setSchemaId) {
if (base != null && !isReplace) {
// require that the current schema has not changed
require(new UpdateRequirement.AssertCurrentSchemaID(base.currentSchemaId()));
}
this.setSchemaId = true;
}
requireCurrentSchemaNotChanged();
}

private void update(MetadataUpdate.AddPartitionSpec unused) {
Expand All @@ -156,13 +152,7 @@ private void update(MetadataUpdate.AddPartitionSpec unused) {
}

private void update(MetadataUpdate.SetDefaultPartitionSpec unused) {
if (!setSpecId) {
if (base != null && !isReplace) {
// require that the default spec has not changed
require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId()));
}
this.setSpecId = true;
}
requireDefaultPartitionSpecNotChanged();
}

private void update(MetadataUpdate.SetDefaultSortOrder unused) {
Expand All @@ -176,15 +166,38 @@ private void update(MetadataUpdate.SetDefaultSortOrder unused) {
}

private void update(MetadataUpdate.RemovePartitionSpecs unused) {
// require that the default partition spec has not changed
requireDefaultPartitionSpecNotChanged();

// require that no branches have changed, so that old specs won't be written.
requireNoBranchesChanged();
}

private void update(MetadataUpdate.RemoveSchemas unused) {
requireCurrentSchemaNotChanged();

// require that no branches have changed, so that old schemas won't be written.
requireNoBranchesChanged();
}

private void requireDefaultPartitionSpecNotChanged() {
if (!setSpecId) {
if (base != null && !isReplace) {
require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId()));
}
this.setSpecId = true;
}
}

// require that no branches have changed, so that old specs won't be written.
private void requireCurrentSchemaNotChanged() {
if (!setSchemaId) {
if (base != null && !isReplace) {
require(new UpdateRequirement.AssertCurrentSchemaID(base.currentSchemaId()));
}
this.setSchemaId = true;
}
}

private void requireNoBranchesChanged() {
if (base != null && !isReplace) {
base.refs()
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,17 @@ public void testRemovePartitionSpec() {
.isEqualTo(json);
}

@Test
public void testRemoveSchemas() {
String action = MetadataUpdateParser.REMOVE_SCHEMAS;
String json = "{\"action\":\"remove-schemas\",\"schema-ids\":[1,2,3]}";
MetadataUpdate expected = new MetadataUpdate.RemoveSchemas(ImmutableSet.of(1, 2, 3));
assertEquals(action, expected, MetadataUpdateParser.fromJson(json));
assertThat(MetadataUpdateParser.toJson(expected))
.as("Remove schemas should convert to the correct JSON value")
.isEqualTo(json);
}

@Test
public void testEnableRowLineage() {
String action = MetadataUpdateParser.ENABLE_ROW_LINEAGE;
Expand Down Expand Up @@ -1050,6 +1061,11 @@ public void assertEquals(
(MetadataUpdate.RemovePartitionSpecs) expectedUpdate,
(MetadataUpdate.RemovePartitionSpecs) actualUpdate);
break;
case MetadataUpdateParser.REMOVE_SCHEMAS:
assertEqualsRemoveSchemas(
(MetadataUpdate.RemoveSchemas) expectedUpdate,
(MetadataUpdate.RemoveSchemas) actualUpdate);
break;
case MetadataUpdateParser.ENABLE_ROW_LINEAGE:
assertThat(actualUpdate).isInstanceOf(MetadataUpdate.EnableRowLineage.class);
break;
Expand Down Expand Up @@ -1279,6 +1295,11 @@ private static void assertEqualsRemovePartitionSpecs(
assertThat(actual.specIds()).containsExactlyInAnyOrderElementsOf(expected.specIds());
}

private static void assertEqualsRemoveSchemas(
MetadataUpdate.RemoveSchemas expected, MetadataUpdate.RemoveSchemas actual) {
assertThat(actual.schemaIds()).containsExactlyInAnyOrderElementsOf(expected.schemaIds());
}

private String createManifestListWithManifestFiles(long snapshotId, Long parentSnapshotId)
throws IOException {
File manifestList = File.createTempFile("manifests", null, temp.toFile());
Expand Down
75 changes: 75 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -48,8 +50,10 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;

@ExtendWith(ParameterizedTestExtension.class)
public class TestRemoveSnapshots extends TestBase {
Expand Down Expand Up @@ -1705,6 +1709,77 @@ public void testRemoveSpecsDoesntRemoveDefaultSpec() throws IOException {
.containsExactly(dataBucketSpec.specId());
}

@TestTemplate
public void testRemoveSchemas() {
table.newAppend().appendFile(FILE_A).commit();

Set<String> expectedDeletedFiles = Sets.newHashSet();
expectedDeletedFiles.add(table.currentSnapshot().manifestListLocation());

table.updateSchema().addColumn("extra_col1", Types.StringType.get()).commit();

table.newAppend().appendFile(FILE_B).commit();
expectedDeletedFiles.add(table.currentSnapshot().manifestListLocation());

table.updateSchema().addColumn("extra_col2", Types.LongType.get()).deleteColumn("id").commit();

table.newAppend().appendFile(FILE_A2).commit();

assertThat(table.schemas().size()).isEqualTo(3);

Set<String> deletedFiles = Sets.newHashSet();
// Expire all snapshots and schemas except the current ones.
removeSnapshots(table)
.expireOlderThan(System.currentTimeMillis())
.cleanExpiredMetadata(true)
.deleteWith(deletedFiles::add)
.commit();

assertThat(deletedFiles).containsExactlyInAnyOrderElementsOf(expectedDeletedFiles);
assertThat(table.schemas().values()).containsExactly(table.schema());
}

@TestTemplate
public void testNoSchemasToRemove() {
String tableName = "test_no_schemas_to_remove";
TestTables.TestTableOperations ops =
Mockito.spy(new TestTables.TestTableOperations(tableName, tableDir));
TestTables.TestTable table =
TestTables.create(
tableDir,
tableName,
SCHEMA,
PartitionSpec.unpartitioned(),
SortOrder.unsorted(),
formatVersion,
ops);

table.newAppend().appendFile(FILE_A).commit();

Set<String> expectedDeletedFiles = Sets.newHashSet();
expectedDeletedFiles.add(table.currentSnapshot().manifestListLocation());

table.newAppend().appendFile(FILE_B).commit();

Set<String> deletedFiles = Sets.newHashSet();
// Expire all snapshots except the current one. No unused schemas to be removed.
removeSnapshots(table)
.expireOlderThan(System.currentTimeMillis())
.cleanExpiredMetadata(true)
.deleteWith(deletedFiles::add)
.commit();

assertThat(deletedFiles).containsExactlyInAnyOrderElementsOf(expectedDeletedFiles);
assertThat(table.schemas().values()).containsExactly(table.schema());
Mockito.verify(ops, Mockito.never())
.commit(
any(),
argThat(
meta ->
meta.changes().stream()
.anyMatch(u -> u instanceof MetadataUpdate.RemoveSchemas)));
}

private Set<String> manifestPaths(Snapshot snapshot, FileIO io) {
return snapshot.allManifests(io).stream().map(ManifestFile::path).collect(Collectors.toSet());
}
Expand Down
Loading

0 comments on commit 60744fa

Please sign in to comment.