Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relax decimal metadata checks for mismatched precision/scale [databricks] #12060

Open
wants to merge 4 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
# Copyright (c) 2020-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1500,10 +1500,17 @@ def test_parquet_check_schema_compatibility_nested_types(spark_tmp_path):
(DecimalGen(7, 4), DecimalGen(5, 2)),
(DecimalGen(10, 7), DecimalGen(5, 2)),
(DecimalGen(20, 17), DecimalGen(5, 2)),
# Narrowing precision
(DecimalGen(20, 0), DecimalGen(10, 0)),
# Increasing precision and decreasing scale
(DecimalGen(5, 4), DecimalGen(7, 2)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: So I mapped out all of the tests here. I looked at any change in the data type and also if the scale increased, stayed the same, or decreased crossed with if the whole part (aka the precision - the scale) increased stayed the same or decreased.

data type 32->32 64->64 128->128 32->64 32->128 64->128 128->64 128->32 128->64 64->32
scale same/whole same N/A noop N/A noop N/A noop N/A impossible N/A impossible N/A impossible N/A impossible N/A impossible N/A impossible N/A impossible
scale same/whole increase             N/A impossible N/A impossible N/A impossible N/A impossible
scale same/whole decrease       N/A impossible N/A impossible N/A impossible (20,0)->(10,0)      
scale increase/whole same (5,2)->(7,4),(5,2)->(6,3) (10,2)->(12,4) (20,2)->(22,4) (5,2)->(10,7) (5,2)->(20,17) (10,2)->(20,12) N/A impossible N/A impossible N/A impossible N/A impossible
scale increase/whole increase       (5,2)->(12,5) (5,2)->(22,10)   N/A impossible N/A impossible N/A impossible N/A impossible
scale increase/whole decrease (5,2)->(6,4) (10,4)->(12,7)                
scale decrease/whole same (7,4)->(5,2)     N/A impossible N/A impossible N/A impossible   (20,17)->(5,2)   (10,7)->(5,2)
scale decrease/whole increase (5,4)->(7,2) (10,6)->(12,4) (20,7)->(22,5)              
scale decrease/whole decrease       N/A impossible N/A impossible N/A impossible        

I don't expect all of the boxes to be filled. I don't think we need exhaustive tests, but I noticed that

    (DecimalGen(5, 2), DecimalGen(6, 3)),

does not actually increase the precision by larger amount than scale (scale increased by 1 and so did the precision so the whole part stayed the same, just like for (5,2)->(7,4)

Could we get one or two tests for when the scale stays the same and the whole part increases, and similarly for when the scale decreases and so does the whole part.

I don't think this is going to improve the coverage massively.

(DecimalGen(10, 6), DecimalGen(12, 4)),
(DecimalGen(20, 7), DecimalGen(22, 5)),
# Increasing the precision and keeping the scale same (increasing the whole number part)
(DecimalGen(10, 2), DecimalGen(22, 2)),
# Decreasing the scale and keeping the precision same (decreasing the whole number part)
(DecimalGen(10, 5), DecimalGen(10, 2)),
(DecimalGen(20, 10), DecimalGen(20, 5)),
# Increasing precision by a smaller amount than scale
(DecimalGen(5, 2), DecimalGen(6, 4)),
(DecimalGen(10, 4), DecimalGen(12, 7))
Expand All @@ -1524,13 +1531,6 @@ def test_parquet_decimal_precision_scale_change(spark_tmp_path, from_decimal_gen
StructField("a", to_decimal_gen.data_type)
])

# Determine if we expect an error based on precision and scale changes
expect_error = (
to_decimal_gen.scale < from_decimal_gen.scale or
(to_decimal_gen.precision - to_decimal_gen.scale) <
(from_decimal_gen.precision - from_decimal_gen.scale)
)

spark_conf = {}
if is_before_spark_400():
# In Spark versions earlier than 4.0, the vectorized Parquet reader throws an exception
Expand All @@ -1539,17 +1539,8 @@ def test_parquet_decimal_precision_scale_change(spark_tmp_path, from_decimal_gen
# is ignored by the plugin.
spark_conf['spark.sql.parquet.enableVectorizedReader'] = 'false'

if expect_error:
assert_gpu_and_cpu_error(
lambda spark: spark.read.schema(read_schema).parquet(data_path).collect(),
conf={},
error_message="Parquet column cannot be converted"
)
else:
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.schema(read_schema).parquet(data_path),
conf=spark_conf
)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.schema(read_schema).parquet(data_path), conf=spark_conf)


@pytest.mark.skipif(is_before_spark_320() or is_spark_321cdh(), reason='Encryption is not supported before Spark 3.2.0 or Parquet < 1.12')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1616,7 +1616,7 @@ object GpuCast {
}
}

private def castDecimalToDecimal(
def castDecimalToDecimal(
input: ColumnView,
from: DecimalType,
to: DecimalType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1033,21 +1033,13 @@ private case class GpuParquetFileFilterHandler(
private def canReadAsDecimal(pt: PrimitiveType, dt: DataType): Boolean = {
(DecimalType.is32BitDecimalType(dt)
|| DecimalType.is64BitDecimalType(dt)
|| DecimalType.isByteArrayDecimalType(dt)) && isDecimalTypeMatched(pt.getDecimalMetadata, dt)
|| DecimalType.isByteArrayDecimalType(dt)) && isValidDecimalType(pt.getDecimalMetadata)
}

// TODO: After we deprecate Spark 3.1, fetch decimal meta with DecimalLogicalTypeAnnotation
@scala.annotation.nowarn("msg=class DecimalMetadata in package schema is deprecated")
private def isDecimalTypeMatched(metadata: DecimalMetadata,
sparkType: DataType): Boolean = {
if (metadata == null) {
false
} else {
val dt = sparkType.asInstanceOf[DecimalType]
val scaleIncrease = dt.scale - metadata.getScale
val precisionIncrease = dt.precision - metadata.getPrecision
scaleIncrease >= 0 && precisionIncrease >= scaleIncrease
}
private def isValidDecimalType(metadata: DecimalMetadata): Boolean = {
metadata != null
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -576,7 +576,10 @@ object ParquetSchemaUtils {
private def evolveSchemaCasts(cv: ColumnView, dt: DataType, originalFromDt: DataType)
: ColumnView = {
if (needDecimalCast(cv, dt)) {
cv.castTo(DecimalUtil.createCudfDecimal(dt.asInstanceOf[DecimalType]))
val fromDecimal = originalFromDt.asInstanceOf[DecimalType]
val toDecimal = dt.asInstanceOf[DecimalType]
val ansiMode = CastOptions.DEFAULT_CAST_OPTIONS.isAnsiMode
GpuCast.castDecimalToDecimal(cv, fromDecimal, toDecimal, ansiMode)
} else if (needUnsignedToSignedCast(cv, dt) || needInt32Downcast(cv, dt) ||
needSignedUpcast(cv, dt)) {
cv.castTo(DType.create(GpuColumnVector.getNonNestedRapidsType(dt).getTypeId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class RapidsTestSettings extends BackendTestSettings {
.exclude("unannotated array of struct with unannotated array", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11476"))
enableSuite[RapidsParquetQuerySuite]
.exclude("SPARK-26677: negated null-safe equality comparison should not filter matched row groups", ADJUST_UT("fetches the CPU version of Execution Plan instead of the GPU version."))
.exclude("SPARK-34212 Parquet should read decimals correctly", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11433"))
.exclude("SPARK-34212 Parquet should read decimals correctly", ADJUST_UT("Vectorized Parquet reader throws an exception when scale is narrowed in Apache Spark where as the spark-rapids plugin does not."))
enableSuite[RapidsParquetRebaseDatetimeSuite]
.exclude("SPARK-31159, SPARK-37705: compatibility with Spark 2.4/3.2 in reading dates/timestamps", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11599"))
.exclude("SPARK-31159, SPARK-37705: rebasing timestamps in write", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11593"))
Expand Down
Loading