-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Iceberg/Comet integration POC #9841
Conversation
...k/src/main/java/org/apache/iceberg/spark/data/vectorized/comet/CometIcebergColumnReader.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the right direction to take. I did an initial high-level pass. Looking forward to having a Comet release soon.
...k/src/main/java/org/apache/iceberg/spark/data/vectorized/comet/CometIcebergColumnReader.java
Outdated
Show resolved
Hide resolved
...k/src/main/java/org/apache/iceberg/spark/data/vectorized/comet/CometIcebergColumnReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/build.gradle
Outdated
} | ||
|
||
compileOnly "org.apache.comet:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.1.0-SNAPSHOT" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this library will only contain the reader, not the operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. This only contains the reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it need to be Spark Version Dependent? Just wondering
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are currently doing some experiments to see if we can provide a Spark Version independent jar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for exploring that.
...ain/java/org/apache/iceberg/spark/data/vectorized/comet/CometIcebergColumnarBatchReader.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/iceberg/spark/data/vectorized/comet/CometIcebergColumnarBatchReader.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/iceberg/spark/data/vectorized/comet/CometIcebergColumnarBatchReader.java
Outdated
Show resolved
Hide resolved
...rk/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/iceberg/spark/data/vectorized/comet/CometIcebergPositionColumnReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
Outdated
Show resolved
Hide resolved
...v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/BaseColumnBatchLoader.java
Outdated
Show resolved
Hide resolved
....4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/comet/CometColumnReader.java
Outdated
Show resolved
Hide resolved
build.gradle
Outdated
@@ -45,6 +45,7 @@ buildscript { | |||
} | |||
} | |||
|
|||
String sparkMajorVersion = '3.4' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope we can soon have a snapshot for Comet jar independent of Spark to clean up deps here.
We can't have parquet
module depend on a jar with any Spark deps.
spark/v3.4/build.gradle
Outdated
} | ||
|
||
compileOnly "org.apache.comet:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.1.0-SNAPSHOT" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for exploring that.
import org.apache.spark.sql.vectorized.ColumnVector; | ||
import org.apache.spark.sql.vectorized.ColumnarBatch; | ||
|
||
@SuppressWarnings("checkstyle:VisibilityModifier") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes would require a bit more time to review. I'll do that tomorrow. I think we would want to restructure the original implementation a bit. Not a concern for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would want to structure this a bit differently. Let me think more.
...rk/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
Outdated
Show resolved
Hide resolved
@aokolnychyi I have addressed the comments. Could you please take one more look when you have a moment? Thanks a lot! |
Will check today. |
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ParquetReaderType.java
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
Outdated
Show resolved
Hide resolved
import org.apache.spark.sql.vectorized.ColumnVector; | ||
import org.apache.spark.sql.vectorized.ColumnarBatch; | ||
|
||
@SuppressWarnings("checkstyle:VisibilityModifier") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would want to structure this a bit differently. Let me think more.
...k/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
Outdated
Show resolved
Hide resolved
....4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/comet/CometColumnReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/BatchReadConf.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/BatchReadConf.java
Outdated
Show resolved
Hide resolved
@huaxingao - Hi, is the Comet Parquet reader able to support page skipping/use page indexes? -eg see #193 for the Iceberg Parquet reader initial issue. |
@cornelcreanga Comet Parquet reader doesn't support page skipping yet |
hey @huaxingao |
@PaulLiang1 Thank you for your interest! We are currently working on a binary release of DataFusion Comet. Once the binary release is available, I will proceed with this PR. |
@huaxingao Thanks |
@PaulLiang1 Thanks! I'll check with my colleague tomorrow to find out where we are in the binary release process. |
@PaulLiang1 We are pretty close to this and will have a binary release for Comet soon. |
got it, thanks for letting me know. please feel free to let us know if there is anything we could help on. thanks! |
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ParquetReaderType.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
Outdated
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
Outdated
Show resolved
Hide resolved
initialized = true; | ||
} | ||
|
||
private Object convertToSparkValue(T value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a fragile place. I'll have to come back with fresh eyes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These conversions look correct to me - they match the internal type specified in Spark for these datatypes
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
Outdated
Show resolved
Hide resolved
...4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
Outdated
Show resolved
Hide resolved
...4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a detailed pass. My biggest questions:
- Depending directly on shaded classes from Comet. I think this should be hidden by Comet APIs. Not sure it is a blocker, though.
- Iceberg reader should be the default and tests have to be parameterized to support Comet. Where tests fail, there must be no correctness issue. Ideally, we would detect that the read cannot be handled by Comet in
SparkBatch
.
I have very limited understanding on how Comet works so I can't review that part and rely on @huaxingao. I focused primarily on how the integration is done and impact to the existing reader path. @sunchao @parthchandra, you are welcome to review those parts.
class CometColumnReader implements VectorizedReader<CometVector> { | ||
public static final int DEFAULT_BATCH_SIZE = 5000; | ||
|
||
private final DataType sparkType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Final vars should be grouped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks
Thanks a lot @aokolnychyi for your detailed review! I will
|
Sounds good. Other than what was mentioned in the review, the change looks good to me. We will have to adapt and run our JMH benchmarks as well. This can be done in a separate PR. |
@@ -48,4 +48,7 @@ | |||
|
|||
<!-- Referencing guava classes should be allowed in classes within bundled-guava module --> | |||
<suppress files="org.apache.iceberg.GuavaClasses" id="BanUnrelocatedGuavaClasses"/> | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@huaxingao can you log an issue in Comet to address this? CometSchemaImporter
is a Comet class but is in the org.apache.arrow.c
package to overcome access restrictions (Arrow's SchemaImporter is package private). We can create a wrapper class to access the schema importer.
Also, we should ideally use the allocator from BatchReader
, but that too can be in the wrapper class, I think. There is no issue with using a new allocator for each column, but the arrow allocator has powerful features in memory accounting that we can take advantage of down the road.
delegate.close(); | ||
} | ||
|
||
CometSchemaImporter importer = new CometSchemaImporter(new RootAllocator()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, we should call close
on the importer after the column reader is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
private final CometVector vector; | ||
private final ColumnDescriptor descriptor; | ||
private boolean initialized = false; | ||
private int batchSize = DEFAULT_BATCH_SIZE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this natch the Comet batch size? https://github.com/apache/datafusion-comet/blob/e9649477c4f8b4c6906244c3cc6828b83f32f735/common/src/main/scala/org/apache/comet/CometConf.scala#L492
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally matched the default batch size to Iceberg's default batch size, but you are right, it makes more sense to match to Comet default size, so I have changed it to Comet default size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
import org.apache.spark.sql.types.StructField; | ||
|
||
class CometColumnReader implements VectorizedReader<CometVector> { | ||
public static final int DEFAULT_BATCH_SIZE = 5000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this match the Comet batch size? https://github.com/apache/datafusion-comet/blob/e9649477c4f8b4c6906244c3cc6828b83f32f735/common/src/main/scala/org/apache/comet/CometConf.scala#L492
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
||
private final DataType sparkType; | ||
// the delegated column reader from Comet side | ||
private AbstractColumnReader delegate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interaction between CometColumnarBatchReader.delegate
and CometColumnReader.delegate` is a little confusing. A comment explaining it would be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added comments in CometColumnarBatchReader
private final CometColumnReader[] readers; | ||
private final boolean hasIsDeletedColumn; | ||
// The delegated batch reader on Comet side | ||
private final BatchReader delegate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have this and not use the nextBatch
call directly (instead we are explicitly calling readBatch
on each column reader). A comment to explain why would be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding the comment !
initialized = true; | ||
} | ||
|
||
private Object convertToSparkValue(T value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These conversions look correct to me - they match the internal type specified in Spark for these datatypes
if (dataType == DataTypes.StringType && value instanceof String) { | ||
return UTF8String.fromString((String) value); | ||
} else if (dataType instanceof DecimalType && value instanceof BigDecimal) { | ||
return Decimal.apply((BigDecimal) value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looked dangerous at first. Maybe a comment to clarify that this is the Spark Decimal class (which can handle BigDecimal precision)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments are added. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more change needed, otherwise Comet side looks good to me.
|
||
class CometColumnReader implements VectorizedReader<ColumnVector> { | ||
// use the Comet default batch size | ||
public static final int DEFAULT_BATCH_SIZE = 8192; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. In some follow up PR we can try to pass in the configured value (not just the default).
private final CometVector vector; | ||
private final ColumnDescriptor descriptor; | ||
private boolean initialized = false; | ||
private int batchSize = DEFAULT_BATCH_SIZE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
private final CometColumnReader[] readers; | ||
private final boolean hasIsDeletedColumn; | ||
// The delegated batch reader on Comet side | ||
private final BatchReader delegate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding the comment !
* groups. | ||
*/ | ||
public void reset() { | ||
if (delegate != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If delegate
is not null here importer
will also be not null. I think the importer can be reused, so there is no need to close it here, but we overwrite it in the next line, so either we should reuse it, or close it. (Closing it is safer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks!
I am OK merging the change if we revert the default reader type and Comet experts approve the logic. I won't block this work because of the dependency on shaded APIs. We will need to parameterize tests to test both reader types and run JMH benchmarks in the future. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comet side changes look good
Thanks, @huaxingao! Thanks for reviewing, @parthchandra! |
@aokolnychyi Thank you so much for your reviewing and merge this PR! Also thanks @parthchandra and @RussellSpitzer for reviewing! |
This PR shows how I will integrate Comet with iceberg. The PR doesn't compile yet because we haven't released Comet yet, but it shows the ideas how we are going to change iceberg code to integrate Comet. Also, Comet doesn't have Spark3.5 support yet so I am doing this on 3.4, but we will add 3.5 support in Comet.
In
VectorizedSparkParquetReaders.buildReader
, if Comet library is available, aCometIcebergColumnarBatchReader
will be created, which will use Comet batch reader to read data. We can also add a property later to control whether we want to use Comet or not.The logic in
CometIcebergVectorizedReaderBuilder
is very similar toVectorizedReaderBuilder
. It builds Comet column reader instead of iceberg column reader.The delete logic in
CometIcebergColumnarBatchReader
is exactly the same as the one inColumnarBatchReader
. I will extract the common code and put the common code in a base class.The main motivation of this PR is to improve performance using native execution. Comet's Parquet reader is a hybrid implementation: IO and decompression are done in the JVM while decoding is done natively. There is some performance gain from native decoding, but the gain is not much. However, by switching to the Comet Parquet reader, Comet will recognize that this is a Comet scan and will convert the Spark physical plan into a Comet plan for native execution. The major performance gain will be from this native execution.