Skip to content

Commit

Permalink
SNOW-1788445: Intermedia stage file is not cleaned up after uploading…
Browse files Browse the repository at this point in the history
… in PARQUET format (#593)
  • Loading branch information
sfc-gh-yuwang authored Nov 5, 2024
1 parent 452d89b commit bc76a33
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
33 changes: 21 additions & 12 deletions src/it/scala/net/snowflake/spark/snowflake/ParquetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ class ParquetSuite extends IntegrationSuiteBase {
val test_nested_dataframe: String = Random.alphanumeric.filter(_.isLetter).take(10).mkString

override def afterAll(): Unit = {
runSql(s"drop table if exists $test_all_type")
runSql(s"drop table if exists $test_all_type_multi_line")
runSql(s"drop table if exists $test_array_map")
runSql(s"drop table if exists $test_conversion")
runSql(s"drop table if exists $test_conversion_by_name")
runSql(s"drop table if exists $test_column_map")
runSql(s"drop table if exists $test_trim")
runSql(s"drop table if exists $test_date")
runSql(s"drop table if exists $test_special_char")
runSql(s"drop table if exists $test_special_char_to_exist")
runSql(s"drop table if exists $test_column_map_parquet")
runSql(s"drop table if exists $test_column_map_not_match")
jdbcUpdate(s"drop table if exists $test_all_type")
jdbcUpdate(s"drop table if exists $test_all_type_multi_line")
jdbcUpdate(s"drop table if exists $test_array_map")
jdbcUpdate(s"drop table if exists $test_conversion")
jdbcUpdate(s"drop table if exists $test_conversion_by_name")
jdbcUpdate(s"drop table if exists $test_column_map")
jdbcUpdate(s"drop table if exists $test_trim")
jdbcUpdate(s"drop table if exists $test_date")
jdbcUpdate(s"drop table if exists $test_special_char")
jdbcUpdate(s"drop table if exists $test_special_char_to_exist")
jdbcUpdate(s"drop table if exists $test_column_map_parquet")
jdbcUpdate(s"drop table if exists $test_column_map_not_match")
jdbcUpdate(s"drop table if exists $test_nested_dataframe")
super.afterAll()
}

Expand Down Expand Up @@ -90,6 +91,10 @@ class ParquetSuite extends IntegrationSuiteBase {
)
)
checkAnswer(newDf, expectedAnswer)

// assert no staging table is left
val res = sparkSession.sql(s"show tables like '%${test_all_type}_STAGING%'").collect()
assert(res.length == 0)
}

test("test parquet with all type and multiple lines"){
Expand Down Expand Up @@ -258,6 +263,10 @@ class ParquetSuite extends IntegrationSuiteBase {
checkAnswer(newDf, List(Row(2, 1, 4, 3)))
assert(newDf.schema.map(field => field.name)
.mkString(",") == Seq("ONE", "TWO", "THREE", """"Fo.ur"""").mkString(","))

// assert no staging table is left
val res = sparkSession.sql(s"show tables like '%${test_conversion_by_name}_STAGING%'").collect()
assert(res.length == 0)
}

test("test parquet name conversion with column map"){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ private[io] object StageWriter {
if (params.useParquetInWrite()){
// temporary table to store parquet file
conn.createTable(tempTable.name, schema, params,
overwrite = false, temporary = false)
overwrite = false, temporary = true)

if (saveMode == SaveMode.Overwrite){
conn.createTable(relayTable.name, params.toSnowflakeSchema(schema), params,
Expand Down Expand Up @@ -501,6 +501,7 @@ private[io] object StageWriter {
params.toSnowflakeSchema(schema), schema, params)
conn.commit()
}
conn.dropTable(tempTable.name)
} else {
if (saveMode == SaveMode.Overwrite && params.useStagingTable) {
if (tableExists) {
Expand Down

0 comments on commit bc76a33

Please sign in to comment.