From 444a633ff3997dfbf60bb3e96b4f67308f46d242 Mon Sep 17 00:00:00 2001 From: Adrian Date: Thu, 30 May 2024 10:24:33 -0400 Subject: [PATCH] fix: CQDG-00 fix config --- src/main/scala/bio/ferlab/HPOMain.scala | 28 ++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/src/main/scala/bio/ferlab/HPOMain.scala b/src/main/scala/bio/ferlab/HPOMain.scala index 321f789..ec9238b 100644 --- a/src/main/scala/bio/ferlab/HPOMain.scala +++ b/src/main/scala/bio/ferlab/HPOMain.scala @@ -3,6 +3,8 @@ package bio.ferlab import bio.ferlab.config.Config import bio.ferlab.ontology.{ICDTerm, OntologyTerm} import bio.ferlab.transform.{DownloadTransformer, WriteJson, WriteParquet} +import org.apache.spark.SparkConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.CORRECTED import org.apache.spark.sql.{SaveMode, SparkSession} import pureconfig.ConfigReader.Result import pureconfig._ @@ -16,8 +18,32 @@ object HPOMain extends App { .load[Config] .getOrElse(throw new Exception("Wrong Configuration")) + + + + + private val defaultConfigs = Map( + "spark.databricks.delta.merge.repartitionBeforeWrite.enabled"->"true", + "spark.databricks.delta.retentionDurationCheck.enabled"->"false", + "spark.databricks.delta.schema.autoMerge.enabled"->"true", + "spark.delta.merge.repartitionBeforeWrite"->"true", + "spark.sql.autoBroadcastJoinThreshold"->"-1", + "spark.sql.catalog.spark_catalog"->"org.apache.spark.sql.delta.catalog.DeltaCatalog", + "spark.sql.extensions"->"io.delta.sql.DeltaSparkSessionExtension", + "spark.sql.legacy.parquet.datetimeRebaseModeInWrite"->"CORRECTED", + "spark.sql.legacy.timeParserPolicy"->"CORRECTED", + "spark.sql.mapKeyDedupPolicy"->"LAST_WIN" + ) + + val sparkConfigs: SparkConf = + defaultConfigs + .foldLeft(new SparkConf()){ case (c, (k, v)) => c.set(k, v) } + + implicit val spark: SparkSession = SparkSession.builder - .appName("HPO") + .config(sparkConfigs) + .appName("HPOMain") + .enableHiveSupport() .master("local[*]") // .config("fs.s3a.path.style.access", s"${config.aws.pathStyleAccess}") // .config("fs.s3a.endpoint", s"${config.aws.endpoint}")