From c85e7e51c5adfe022fb16095d8630f72b835b576 Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Wed, 20 Nov 2024 10:11:01 +0800 Subject: [PATCH] Add compression codec extension to velox written parquet file --- .../execution/VeloxParquetWriteSuite.scala | 22 ++++++++++- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 38 +++++++++++++++++-- 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala index 4c76c753b90e..708bfdf9d000 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.util.Utils import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.util.HadoopInputFile import org.junit.Assert @@ -31,6 +32,22 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite { override protected val resourcePath: String = "/tpch-data-parquet" override protected val fileFormat: String = "parquet" + // The parquet compression short names + private val shortParquetCompressionCodecNames = Map( + "none" -> CompressionCodecName.UNCOMPRESSED, + "uncompressed" -> CompressionCodecName.UNCOMPRESSED, + "snappy" -> CompressionCodecName.SNAPPY, + "gzip" -> CompressionCodecName.GZIP, + "lzo" -> CompressionCodecName.LZO, + "lz4" -> CompressionCodecName.LZ4, + "brotli" -> CompressionCodecName.BROTLI, + "zstd" -> CompressionCodecName.ZSTD + ) + + private def getParquetFileExtension(codec: String): String = { + s"${shortParquetCompressionCodecNames(codec).getExtension}.parquet" + } + override def beforeAll(): Unit = { super.beforeAll() createTPCHNotNullTables() @@ -49,8 +66,8 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite { spark.sql("select array(struct(1), null) as var1").write.mode("overwrite").save(path) } assert( - testAppender.loggingEvents.exists( - _.getMessage.toString.contains("Use Gluten parquet write for hive")) == false) + !testAppender.loggingEvents.exists( + _.getMessage.toString.contains("Use Gluten parquet write for hive"))) } } @@ -77,6 +94,7 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite { parquetFiles.forall { file => val path = new Path(f.getCanonicalPath, file) + assert(file.endsWith(getParquetFileExtension(codec))) val in = HadoopInputFile.fromPath(path, spark.sessionState.newHadoopConf()) Utils.tryWithResource(ParquetFileReader.open(in)) { reader => diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index cdd9269e1494..1efa7338796d 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -493,13 +493,43 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } } +std::string makeUuid() { + return boost::lexical_cast(boost::uuids::random_generator()()); +} + +std::string compressionFileNameSuffix(common::CompressionKind kind) { + switch (static_cast(kind)) { + case common::CompressionKind_ZLIB: + return ".zlib"; + case common::CompressionKind_SNAPPY: + return ".snappy"; + case common::CompressionKind_LZO: + return ".lzo"; + case common::CompressionKind_ZSTD: + return ".zstd"; + case common::CompressionKind_LZ4: + return ".lz4"; + case common::CompressionKind_GZIP: + return ".gz"; + case common::CompressionKind_NONE: + default: + return ""; + } +} + std::shared_ptr makeLocationHandle( const std::string& targetDirectory, + dwio::common::FileFormat fileFormat, + common::CompressionKind compression, const std::optional& writeDirectory = std::nullopt, const connector::hive::LocationHandle::TableType& tableType = connector::hive::LocationHandle::TableType::kExisting) { + std::string targetFileName = ""; + if (fileFormat == dwio::common::FileFormat::PARQUET) { + targetFileName = fmt::format("gluten-part-{}{}{}", makeUuid(), compressionFileNameSuffix(compression), ".parquet"); + } return std::make_shared( - targetDirectory, writeDirectory.value_or(targetDirectory), tableType); + targetDirectory, writeDirectory.value_or(targetDirectory), tableType, targetFileName); } std::shared_ptr makeHiveInsertTableHandle( @@ -615,6 +645,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: // Do not hard-code connector ID and allow for connectors other than Hive. static const std::string kHiveConnectorId = "test-hive"; + // Currently only support parquet format. + dwio::common::FileFormat fileFormat = dwio::common::FileFormat::PARQUET; return std::make_shared( nextPlanNodeId(), @@ -628,8 +660,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: inputType->children(), partitionedKey, nullptr /*bucketProperty*/, - makeLocationHandle(writePath), - dwio::common::FileFormat::PARQUET, // Currently only support parquet format. + makeLocationHandle(writePath, fileFormat, compressionCodec), + fileFormat, compressionCodec)), (!partitionedKey.empty()), exec::TableWriteTraits::outputType(nullptr),