Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg/Comet integration POC #9841

Merged
merged 32 commits into from
Jan 31, 2025
Merged

Iceberg/Comet integration POC #9841

merged 32 commits into from
Jan 31, 2025

Conversation

huaxingao
Copy link
Contributor

@huaxingao huaxingao commented Mar 1, 2024

This PR shows how I will integrate Comet with iceberg. The PR doesn't compile yet because we haven't released Comet yet, but it shows the ideas how we are going to change iceberg code to integrate Comet. Also, Comet doesn't have Spark3.5 support yet so I am doing this on 3.4, but we will add 3.5 support in Comet.

In VectorizedSparkParquetReaders.buildReader, if Comet library is available, a CometIcebergColumnarBatchReader will be created, which will use Comet batch reader to read data. We can also add a property later to control whether we want to use Comet or not.

The logic in CometIcebergVectorizedReaderBuilder is very similar to VectorizedReaderBuilder. It builds Comet column reader instead of iceberg column reader.

The delete logic in CometIcebergColumnarBatchReader is exactly the same as the one in ColumnarBatchReader. I will extract the common code and put the common code in a base class.

The main motivation of this PR is to improve performance using native execution. Comet's Parquet reader is a hybrid implementation: IO and decompression are done in the JVM while decoding is done natively. There is some performance gain from native decoding, but the gain is not much. However, by switching to the Comet Parquet reader, Comet will recognize that this is a Comet scan and will convert the Spark physical plan into a Comet plan for native execution. The major performance gain will be from this native execution.

@huaxingao
Copy link
Contributor Author

cc @aokolnychyi @sunchao

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the right direction to take. I did an initial high-level pass. Looking forward to having a Comet release soon.

}

compileOnly "org.apache.comet:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.1.0-SNAPSHOT"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this library will only contain the reader, not the operators.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. This only contains the reader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to be Spark Version Dependent? Just wondering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are currently doing some experiments to see if we can provide a Spark Version independent jar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for exploring that.

@github-actions github-actions bot added the API label Apr 18, 2024
api/src/main/java/org/apache/iceberg/ReaderType.java Outdated Show resolved Hide resolved
build.gradle Outdated
@@ -45,6 +45,7 @@ buildscript {
}
}

String sparkMajorVersion = '3.4'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope we can soon have a snapshot for Comet jar independent of Spark to clean up deps here.
We can't have parquet module depend on a jar with any Spark deps.

spark/v3.4/build.gradle Outdated Show resolved Hide resolved
}

compileOnly "org.apache.comet:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.1.0-SNAPSHOT"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for exploring that.

gradle.properties Outdated Show resolved Hide resolved
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

@SuppressWarnings("checkstyle:VisibilityModifier")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes would require a bit more time to review. I'll do that tomorrow. I think we would want to restructure the original implementation a bit. Not a concern for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would want to structure this a bit differently. Let me think more.

@github-actions github-actions bot removed the API label Apr 26, 2024
@huaxingao
Copy link
Contributor Author

@aokolnychyi I have addressed the comments. Could you please take one more look when you have a moment? Thanks a lot!

@aokolnychyi
Copy link
Contributor

Will check today.

import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

@SuppressWarnings("checkstyle:VisibilityModifier")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would want to structure this a bit differently. Let me think more.

@cornelcreanga
Copy link

@huaxingao - Hi, is the Comet Parquet reader able to support page skipping/use page indexes? -eg see #193 for the Iceberg Parquet reader initial issue.

@huaxingao
Copy link
Contributor Author

@cornelcreanga Comet Parquet reader doesn't support page skipping yet

@huaxingao huaxingao closed this Jun 20, 2024
@huaxingao huaxingao reopened this Jun 20, 2024
@PaulLiang1
Copy link

hey @huaxingao
we are really interested in this feature, just wonder what can we help to getting this integrated?

@huaxingao
Copy link
Contributor Author

@PaulLiang1 Thank you for your interest! We are currently working on a binary release of DataFusion Comet. Once the binary release is available, I will proceed with this PR.

@PaulLiang1
Copy link

@huaxingao
I think we got a internal version of building DataFusion comet and publish a JAR internally.
Is there anything we can help with on that front?

Thanks

@huaxingao
Copy link
Contributor Author

@PaulLiang1 Thanks! I'll check with my colleague tomorrow to find out where we are in the binary release process.

@huaxingao
Copy link
Contributor Author

@PaulLiang1 We are pretty close to this and will have a binary release for Comet soon.

@PaulLiang1
Copy link

@PaulLiang1 Thanks! I'll check with my colleague tomorrow to find out where we are in the binary release process.

got it, thanks for letting me know. please feel free to let us know if there is anything we could help on. thanks!

initialized = true;
}

private Object convertToSparkValue(T value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fragile place. I'll have to come back with fresh eyes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These conversions look correct to me - they match the internal type specified in Spark for these datatypes

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a detailed pass. My biggest questions:

  • Depending directly on shaded classes from Comet. I think this should be hidden by Comet APIs. Not sure it is a blocker, though.
  • Iceberg reader should be the default and tests have to be parameterized to support Comet. Where tests fail, there must be no correctness issue. Ideally, we would detect that the read cannot be handled by Comet in SparkBatch.

I have very limited understanding on how Comet works so I can't review that part and rely on @huaxingao. I focused primarily on how the integration is done and impact to the existing reader path. @sunchao @parthchandra, you are welcome to review those parts.

class CometColumnReader implements VectorizedReader<CometVector> {
public static final int DEFAULT_BATCH_SIZE = 5000;

private final DataType sparkType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final vars should be grouped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks

@huaxingao
Copy link
Contributor Author

Thanks a lot @aokolnychyi for your detailed review! I will

  1. fix the shade problem
  2. change the default to iceberg in the final version. I default to Comet only for testing purpose to make sure all the tests will pass with Comet.
  3. @parthchandra will help to review the Comet part today.

@aokolnychyi
Copy link
Contributor

Sounds good. Other than what was mentioned in the review, the change looks good to me.

We will have to adapt and run our JMH benchmarks as well. This can be done in a separate PR.

@@ -48,4 +48,7 @@

<!-- Referencing guava classes should be allowed in classes within bundled-guava module -->
<suppress files="org.apache.iceberg.GuavaClasses" id="BanUnrelocatedGuavaClasses"/>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao can you log an issue in Comet to address this? CometSchemaImporter is a Comet class but is in the org.apache.arrow.c package to overcome access restrictions (Arrow's SchemaImporter is package private). We can create a wrapper class to access the schema importer.
Also, we should ideally use the allocator from BatchReader, but that too can be in the wrapper class, I think. There is no issue with using a new allocator for each column, but the arrow allocator has powerful features in memory accounting that we can take advantage of down the road.

delegate.close();
}

CometSchemaImporter importer = new CometSchemaImporter(new RootAllocator());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we should call close on the importer after the column reader is done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks!

private final CometVector vector;
private final ColumnDescriptor descriptor;
private boolean initialized = false;
private int batchSize = DEFAULT_BATCH_SIZE;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally matched the default batch size to Iceberg's default batch size, but you are right, it makes more sense to match to Comet default size, so I have changed it to Comet default size

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

import org.apache.spark.sql.types.StructField;

class CometColumnReader implements VectorizedReader<CometVector> {
public static final int DEFAULT_BATCH_SIZE = 5000;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


private final DataType sparkType;
// the delegated column reader from Comet side
private AbstractColumnReader delegate;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interaction between CometColumnarBatchReader.delegate and CometColumnReader.delegate` is a little confusing. A comment explaining it would be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added comments in CometColumnarBatchReader

private final CometColumnReader[] readers;
private final boolean hasIsDeletedColumn;
// The delegated batch reader on Comet side
private final BatchReader delegate;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have this and not use the nextBatch call directly (instead we are explicitly calling readBatch on each column reader). A comment to explain why would be helpful.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding the comment !

initialized = true;
}

private Object convertToSparkValue(T value) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These conversions look correct to me - they match the internal type specified in Spark for these datatypes

if (dataType == DataTypes.StringType && value instanceof String) {
return UTF8String.fromString((String) value);
} else if (dataType instanceof DecimalType && value instanceof BigDecimal) {
return Decimal.apply((BigDecimal) value);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looked dangerous at first. Maybe a comment to clarify that this is the Spark Decimal class (which can handle BigDecimal precision)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments are added. Thanks!

Copy link

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more change needed, otherwise Comet side looks good to me.


class CometColumnReader implements VectorizedReader<ColumnVector> {
// use the Comet default batch size
public static final int DEFAULT_BATCH_SIZE = 8192;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. In some follow up PR we can try to pass in the configured value (not just the default).

private final CometVector vector;
private final ColumnDescriptor descriptor;
private boolean initialized = false;
private int batchSize = DEFAULT_BATCH_SIZE;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

private final CometColumnReader[] readers;
private final boolean hasIsDeletedColumn;
// The delegated batch reader on Comet side
private final BatchReader delegate;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding the comment !

* groups.
*/
public void reset() {
if (delegate != null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If delegate is not null here importer will also be not null. I think the importer can be reused, so there is no need to close it here, but we overwrite it in the next line, so either we should reuse it, or close it. (Closing it is safer)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks!

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Jan 31, 2025

I am OK merging the change if we revert the default reader type and Comet experts approve the logic. I won't block this work because of the dependency on shaded APIs.

We will need to parameterize tests to test both reader types and run JMH benchmarks in the future.

Copy link

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comet side changes look good

@aokolnychyi aokolnychyi merged commit 40334f5 into apache:main Jan 31, 2025
46 checks passed
@aokolnychyi
Copy link
Contributor

Thanks, @huaxingao! Thanks for reviewing, @parthchandra!

@huaxingao
Copy link
Contributor Author

@aokolnychyi Thank you so much for your reviewing and merge this PR! Also thanks @parthchandra and @RussellSpitzer for reviewing!

@huaxingao huaxingao deleted the comet3 branch January 31, 2025 23:39
jbonofre pushed a commit to jbonofre/iceberg that referenced this pull request Feb 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants