Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](paimon)Upgrade the Paimon version to 1.0.0 and Iceberg to 1.6.1 #46990

Merged
merged 3 commits into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,12 @@ public List<Split> getSplits(int numBackends) throws UserException {
try {
return preExecutionAuthenticator.execute(() -> doGetSplits(numBackends));
} catch (Exception e) {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
Optional<NotSupportedException> opt = checkNotSupportedException(e);
if (opt.isPresent()) {
throw opt.get();
} else {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
}
}
}

Expand Down Expand Up @@ -232,7 +237,12 @@ public void doStartSplit() throws UserException {
);
splitAssignment.finishSchedule();
} catch (Exception e) {
splitAssignment.setException(new UserException(e.getMessage(), e));
Optional<NotSupportedException> opt = checkNotSupportedException(e);
if (opt.isPresent()) {
splitAssignment.setException(new UserException(opt.get().getMessage(), opt.get()));
} else {
splitAssignment.setException(new UserException(e.getMessage(), e));
}
}
});
}
Expand Down Expand Up @@ -266,40 +276,7 @@ private TableScan createTableScan() throws UserException {

private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) {
long targetSplitSize = getRealFileSplitSize(0);
CloseableIterable<FileScanTask> splitFiles;
try {
splitFiles = TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
} catch (NullPointerException e) {
/*
Caused by: java.lang.NullPointerException: Type cannot be null
at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull
(Preconditions.java:921) ~[iceberg-bundled-guava-1.4.3.jar:?]
at org.apache.iceberg.types.Types$NestedField.<init>(Types.java:447) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.types.Types$NestedField.optional(Types.java:416) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.lambda$new$0(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.forEach
(RegularImmutableMap.java:297) ~[iceberg-bundled-guava-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:71) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex$Builder.build(DeleteFileIndex.java:578) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.ManifestGroup.plan(ManifestGroup.java:183) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.ManifestGroup.planFiles(ManifestGroup.java:170) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DataTableScan.doPlanFiles(DataTableScan.java:89) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139) ~[iceberg-core-1.4.3.jar:?]
at org.apache.doris.datasource.iceberg.source.IcebergScanNode.doGetSplits
(IcebergScanNode.java:209) ~[doris-fe.jar:1.2-SNAPSHOT]
EXAMPLE:
CREATE TABLE iceberg_tb(col1 INT,col2 STRING) USING ICEBERG PARTITIONED BY (bucket(10,col2));
INSERT INTO iceberg_tb VALUES( ... );
ALTER TABLE iceberg_tb DROP PARTITION FIELD bucket(10,col2);
ALTER TABLE iceberg_tb DROP COLUMNS col2 STRING;
Link: https://github.com/apache/iceberg/pull/10755
*/
LOG.warn("Iceberg TableScanUtil.splitFiles throw NullPointerException. Cause : ", e);
throw new NotSupportedException("Unable to read Iceberg table with dropped old partition column.");
}
return splitFiles;
return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
}

private Split createIcebergSplit(FileScanTask fileScanTask) {
Expand Down Expand Up @@ -529,4 +506,39 @@ private void assignCountToSplits(List<Split> splits, long totalCount) {
public int numApproximateSplits() {
return NUM_SPLITS_PER_PARTITION * partitionPathSet.size() > 0 ? partitionPathSet.size() : 1;
}

private Optional<NotSupportedException> checkNotSupportedException(Exception e) {
if (e instanceof NullPointerException) {
/*
Caused by: java.lang.NullPointerException: Type cannot be null
at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull
(Preconditions.java:921) ~[iceberg-bundled-guava-1.4.3.jar:?]
at org.apache.iceberg.types.Types$NestedField.<init>(Types.java:447) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.types.Types$NestedField.optional(Types.java:416) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.lambda$new$0(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.forEach
(RegularImmutableMap.java:297) ~[iceberg-bundled-guava-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:71) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex$Builder.build(DeleteFileIndex.java:578) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.ManifestGroup.plan(ManifestGroup.java:183) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.ManifestGroup.planFiles(ManifestGroup.java:170) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DataTableScan.doPlanFiles(DataTableScan.java:89) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139) ~[iceberg-core-1.4.3.jar:?]
at org.apache.doris.datasource.iceberg.source.IcebergScanNode.doGetSplits
(IcebergScanNode.java:209) ~[doris-fe.jar:1.2-SNAPSHOT]
EXAMPLE:
CREATE TABLE iceberg_tb(col1 INT,col2 STRING) USING ICEBERG PARTITIONED BY (bucket(10,col2));
INSERT INTO iceberg_tb VALUES( ... );
ALTER TABLE iceberg_tb DROP PARTITION FIELD bucket(10,col2);
ALTER TABLE iceberg_tb DROP COLUMNS col2 STRING;
Link: https://github.com/apache/iceberg/pull/10755
*/
LOG.warn("Iceberg TableScanUtil.splitFiles throw NullPointerException. Cause : ", e);
return Optional.of(
new NotSupportedException("Unable to read Iceberg table with dropped old partition column."));
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Catalog.TableNotExistException;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
Expand Down Expand Up @@ -94,7 +95,15 @@ protected List<String> listDatabaseNames() {
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
makeSureInitialized();
try {
return hadoopAuthenticator.doAs(() -> catalog.tableExists(Identifier.create(dbName, tblName)));
return hadoopAuthenticator.doAs(() -> {
try {
catalog.getTable(Identifier.create(dbName, tblName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API is changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

return true;
} catch (TableNotExistException e) {
return false;
}
});

} catch (IOException e) {
throw new RuntimeException("Failed to check table existence, catalog name: " + getName(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key) throws
PredicateBuilder builder = new PredicateBuilder(table.rowType());
Predicate predicate = builder.equal(0, key.getSchemaId());
// Adding predicates will also return excess data
List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}, {1}, {2}}, predicate);
List<InternalRow> rows = PaimonUtil.read(table, new int[] {0, 1, 2}, predicate);
for (InternalRow row : rows) {
PaimonSchema schema = PaimonUtil.rowToSchema(row);
if (schema.getSchemaId() == key.getSchemaId()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) throws IOE
Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(),
key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS);
// snapshotId and schemaId
List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}, {1}}, null);
List<InternalRow> rows = PaimonUtil.read(table, new int[] {0, 1}, null);
long latestSnapshotId = 0L;
long latestSchemaId = 0L;
for (InternalRow row : rows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class PaimonUtil {
private static final Logger LOG = LogManager.getLogger(PaimonUtil.class);

public static List<InternalRow> read(
Table table, @Nullable int[][] projection, @Nullable Predicate predicate,
Table table, @Nullable int[] projection, @Nullable Predicate predicate,
Pair<ConfigOption<?>, String>... dynamicOptions)
throws IOException {
Map<String, String> options = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,12 @@ public List<Split> getSplits(int numBackends) throws UserException {
.valueOf(sessionVariable.getIgnoreSplitType());
List<Split> splits = new ArrayList<>();
int[] projected = desc.getSlots().stream().mapToInt(
slot -> (source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName())))
slot -> source.getPaimonTable().rowType()
.getFieldNames()
.stream()
.map(String::toLowerCase)
.collect(Collectors.toList())
.indexOf(slot.getColumn().getName()))
.toArray();
ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder();
List<org.apache.paimon.table.source.Split> paimonSplits = readBuilder.withFilter(predicates)
Expand Down
6 changes: 3 additions & 3 deletions fe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ under the License.
<module>be-java-extensions</module>
</modules>
<properties>
<doris.hive.catalog.shade.version>2.1.1</doris.hive.catalog.shade.version>
<doris.hive.catalog.shade.version>2.1.3</doris.hive.catalog.shade.version>
<avro.version>1.11.4</avro.version>
<parquet.version>1.13.1</parquet.version>
<spark.version>3.4.3</spark.version>
Expand Down Expand Up @@ -318,7 +318,7 @@ under the License.
<!-- ATTN: avro version must be consistent with Iceberg version -->
<!-- Please modify iceberg.version and avro.version together,
you can find avro version info in iceberg mvn repository -->
<iceberg.version>1.4.3</iceberg.version>
<iceberg.version>1.6.1</iceberg.version>
<maxcompute.version>0.49.0-public</maxcompute.version>
<arrow.version>17.0.0</arrow.version>
<presto.hadoop.version>2.7.4-11</presto.hadoop.version>
Expand Down Expand Up @@ -365,7 +365,7 @@ under the License.
<quartz.version>2.3.2</quartz.version>
<aircompressor.version>0.27</aircompressor.version>
<!-- paimon -->
<paimon.version>0.8.1</paimon.version>
<paimon.version>1.0.0</paimon.version>
<disruptor.version>3.4.4</disruptor.version>
<!-- arrow flight sql -->
<arrow.vector.classifier>shade-format-flatbuffers</arrow.vector.classifier>
Expand Down