From 98e94af58f1614e74b040057e71bd6402b85e67c Mon Sep 17 00:00:00 2001 From: cashmand Date: Tue, 3 Dec 2024 23:08:22 +0800 Subject: [PATCH] [SPARK-48898][SQL] Fix Variant shredding bug ### What changes were proposed in this pull request? In VariantShreddingWriter, there are two calls to `variantBuilder.appendVariant` that were left over from an earlier version of the shredding spec where we constructed new metadata for every shredded value. This method rebuilds the Variant value to refer to the new metadata dictionary in the builder, so we should not be using it in shredding, where all dictionary IDs now refer to the original Variant metadata. 1) When writing a Variant value that does not match the shredding type. The code was doing the right thing, but unnecessarily calling `variantBuilder.appendVariant` and then discarding the result. The PR removes that dead code. 2) When reconstructing a Variant object that contains only the fields of the original object that don't appear in the shredding schema. This is a correctness bug, since we would modify the value to use new dictionary IDs that do not correspond to the ones in the original metadata. ### Why are the changes needed? Variant shredding correctness. ### Does this PR introduce _any_ user-facing change? No, shredding has not yet been released. ### How was this patch tested? Added a unit test that fails without the fix. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49031 from cashmand/SPARK-48898-bugfix. Authored-by: cashmand Signed-off-by: Wenchen Fan --- .../spark/types/variant/VariantShreddingWriter.java | 6 +++--- .../apache/spark/sql/VariantWriteShreddingSuite.scala | 11 +++++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java b/common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java index b5f8ea0a1484b..bbee7ee0dca38 100644 --- a/common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java +++ b/common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java @@ -101,7 +101,9 @@ public static ShreddedResult castShredded( int id = v.getDictionaryIdAtIndex(i); fieldEntries.add(new VariantBuilder.FieldEntry( field.key, id, variantBuilder.getWritePos() - start)); - variantBuilder.appendVariant(field.value); + // shallowAppendVariant is needed for correctness, since we're relying on the metadata IDs + // being unchanged. + variantBuilder.shallowAppendVariant(field.value); } } if (numFieldsMatched < objectSchema.length) { @@ -133,8 +135,6 @@ public static ShreddedResult castShredded( // Store the typed value. result.addScalar(typedValue); } else { - VariantBuilder variantBuilder = new VariantBuilder(false); - variantBuilder.appendVariant(v); result.addVariantValue(v.getValue()); } } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantWriteShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantWriteShreddingSuite.scala index ed66ddb1f0f44..a62c6e4462464 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/VariantWriteShreddingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantWriteShreddingSuite.scala @@ -179,6 +179,17 @@ class VariantWriteShreddingSuite extends SparkFunSuite with ExpressionEvalHelper // Not an object testWithSchema(obj, ArrayType(StructType.fromDDL("a int, b string")), Row(obj.getMetadata, untypedValue(obj), null)) + + // Similar to the case above where "b" was not in the shredding schema, but with the unshredded + // value being an object. Check that the copied value has correct dictionary IDs. + val obj2 = parseJson("""{"a": 1, "b": {"c": "hello"}}""") + val residual2 = untypedValue("""{"b": {"c": "hello"}}""") + // First byte is the type, second is number of fields, and the third is the ID for "b" + residual2(2) = 1 + // Followed by 2 bytes for offsets, inner object type and number of fields, then ID for "c". + residual2(7) = 2 + testWithSchema(obj2, StructType.fromDDL("a int, c string"), + Row(obj2.getMetadata, residual2, Row(Row(null, 1), Row(null, null)))) } test("shredding as array") {