Skip to content

Commit

Permalink
Add compression codec extension to velox written parquet file
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Nov 20, 2024
1 parent 5d7b963 commit 21e3f00
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
override protected val resourcePath: String = "/tpch-data-parquet"
override protected val fileFormat: String = "parquet"

// The parquet compression short names
private val parquetCompressionCodecExtensions = Map(
"none" -> "",
"uncompressed" -> "",
"snappy" -> ".snappy",
"gzip" -> ".gz",
"lzo" -> ".lzo",
"lz4" -> ".lz4",
"brotli" -> ".br",
"zstd" -> ".zstd"
)

private def getParquetFileExtension(codec: String): String = {
s"${parquetCompressionCodecExtensions(codec)}.parquet"
}

override def beforeAll(): Unit = {
super.beforeAll()
createTPCHNotNullTables()
Expand All @@ -49,8 +65,8 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
spark.sql("select array(struct(1), null) as var1").write.mode("overwrite").save(path)
}
assert(
testAppender.loggingEvents.exists(
_.getMessage.toString.contains("Use Gluten parquet write for hive")) == false)
!testAppender.loggingEvents.exists(
_.getMessage.toString.contains("Use Gluten parquet write for hive")))
}
}

Expand All @@ -77,6 +93,7 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
parquetFiles.forall {
file =>
val path = new Path(f.getCanonicalPath, file)
assert(file.endsWith(getParquetFileExtension(codec)))
val in = HadoopInputFile.fromPath(path, spark.sessionState.newHadoopConf())
Utils.tryWithResource(ParquetFileReader.open(in)) {
reader =>
Expand Down
38 changes: 35 additions & 3 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,13 +493,43 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
}
}

std::string makeUuid() {
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
}

std::string compressionFileNameSuffix(common::CompressionKind kind) {
switch (static_cast<int32_t>(kind)) {
case common::CompressionKind_ZLIB:
return ".zlib";
case common::CompressionKind_SNAPPY:
return ".snappy";
case common::CompressionKind_LZO:
return ".lzo";
case common::CompressionKind_ZSTD:
return ".zstd";
case common::CompressionKind_LZ4:
return ".lz4";
case common::CompressionKind_GZIP:
return ".gz";
case common::CompressionKind_NONE:
default:
return "";
}
}

std::shared_ptr<connector::hive::LocationHandle> makeLocationHandle(
const std::string& targetDirectory,
dwio::common::FileFormat fileFormat,
common::CompressionKind compression,
const std::optional<std::string>& writeDirectory = std::nullopt,
const connector::hive::LocationHandle::TableType& tableType =
connector::hive::LocationHandle::TableType::kExisting) {
std::string targetFileName = "";
if (fileFormat == dwio::common::FileFormat::PARQUET) {
targetFileName = fmt::format("gluten-part-{}{}{}", makeUuid(), compressionFileNameSuffix(compression), ".parquet");
}
return std::make_shared<connector::hive::LocationHandle>(
targetDirectory, writeDirectory.value_or(targetDirectory), tableType);
targetDirectory, writeDirectory.value_or(targetDirectory), tableType, targetFileName);
}

std::shared_ptr<connector::hive::HiveInsertTableHandle> makeHiveInsertTableHandle(
Expand Down Expand Up @@ -615,6 +645,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::

// Do not hard-code connector ID and allow for connectors other than Hive.
static const std::string kHiveConnectorId = "test-hive";
// Currently only support parquet format.
dwio::common::FileFormat fileFormat = dwio::common::FileFormat::PARQUET;

return std::make_shared<core::TableWriteNode>(
nextPlanNodeId(),
Expand All @@ -628,8 +660,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
inputType->children(),
partitionedKey,
nullptr /*bucketProperty*/,
makeLocationHandle(writePath),
dwio::common::FileFormat::PARQUET, // Currently only support parquet format.
makeLocationHandle(writePath, fileFormat, compressionCodec),
fileFormat,
compressionCodec)),
(!partitionedKey.empty()),
exec::TableWriteTraits::outputType(nullptr),
Expand Down

0 comments on commit 21e3f00

Please sign in to comment.