diff --git a/build.sbt b/build.sbt index cdac9e1a..f876b7e5 100644 --- a/build.sbt +++ b/build.sbt @@ -10,13 +10,13 @@ lazy val supportedScalaVersions = List(scala212) scalacOptions ++= Seq("-deprecation", "-Ypartial-unification") javacOptions ++= Seq("-source", "11", "-target", "11", "-Xlint") -val spark3Version = "3.4.2" -val catsVersion = "2.7.0" -val scalatestVersion = "3.2.12" -val pureconfigVersion = "0.17.2" +val spark3Version = "3.5.1" +val catsVersion = "2.10.0" +val scalatestVersion = "3.2.17" +val pureconfigVersion = "0.17.6" val elasticsearchVersion = "7.15.0" -val deltaVersion = "2.4.0" -val glowVersion = "1.2.1" +val deltaVersion = "3.1.0" +val glowVersion = "2.0.0" updateOptions := updateOptions.value.withGigahorse(false) @@ -24,20 +24,17 @@ lazy val `datalake-commons` = (project in file("datalake-commons")) .settings( scalaVersion := scala212, libraryDependencies += "com.github.pureconfig" %% "pureconfig" % pureconfigVersion, - libraryDependencies += "org.scala-lang.modules" %% "scala-collection-compat" % "2.9.0", + libraryDependencies += "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0", libraryDependencies += "org.typelevel" %% "cats-core" % catsVersion, libraryDependencies += "org.scalatest" %% "scalatest" % scalatestVersion, libraryDependencies += "io.projectglow" %% "glow-spark3" % glowVersion % Provided exclude("org.apache.hadoop", "hadoop-client"), libraryDependencies += "com.outr" %% "hasher" % "1.2.2", - libraryDependencies += "com.lihaoyi"%% "mainargs" % "0.5.0", + libraryDependencies += "com.lihaoyi"%% "mainargs" % "0.6.1", libraryDependencies += "org.apache.spark" %% "spark-core" % spark3Version % Provided, libraryDependencies += "org.apache.spark" %% "spark-sql" % spark3Version % Provided, dependencyOverrides ++= Seq( - "org.apache.commons" % "commons-lang3" % "3.9", - "org.antlr" % "antlr4" % "4.8", - "org.antlr" % "antlr4-tool" % "4.8", - "org.antlr" % "antlr4-runtime" % "4.8" -) + "org.apache.commons" % "commons-lang3" % "3.12.0" + ) ) lazy val `datalake-test-utils` = (project in file("datalake-test-utils")) @@ -46,7 +43,7 @@ lazy val `datalake-test-utils` = (project in file("datalake-test-utils")) libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % spark3Version % Provided, "org.apache.spark" %% "spark-sql" % spark3Version % Provided, - "io.delta" %% "delta-core" % deltaVersion % Provided, + "io.delta" %% "delta-spark" % deltaVersion % Provided, "org.scalatest" %% "scalatest" % scalatestVersion % Test ) ) @@ -58,29 +55,27 @@ lazy val `datalake-spark3` = (project in file("datalake-spark3")) libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % spark3Version % Provided, "org.apache.spark" %% "spark-sql" % spark3Version % Provided, - "io.delta" %% "delta-core" % deltaVersion % Provided, + "io.delta" %% "delta-spark" % deltaVersion % Provided, "org.elasticsearch" %% "elasticsearch-spark-30" % elasticsearchVersion % Provided, "com.github.pureconfig" %% "pureconfig" % pureconfigVersion, "org.typelevel" %% "cats-core" % catsVersion, - "io.projectglow" %% "glow-spark3" % glowVersion % Provided exclude("org.apache.hadoop", "hadoop-client"), - "com.microsoft.sqlserver" % "mssql-jdbc" % "8.4.1.jre8" % Provided, + "io.projectglow" %% "glow-spark3" % glowVersion % Provided exclude("org.apache.hadoop", "hadoop-client") exclude("io.netty","netty-all") exclude("io.netty","netty-handler") exclude("io.netty","netty-transport-native-epoll"), + "com.microsoft.sqlserver" % "mssql-jdbc" % "8.4.1.jre11" % Provided, "com.microsoft.aad" % "adal4j" % "0.0.2" % Provided, - "com.microsoft.azure" % "spark-mssql-connector_2.12" % "1.1.0" % Provided, - "com.crealytics" %% "spark-excel" % s"${spark3Version}_0.20.3" % Provided, + "com.microsoft.azure" % "spark-mssql-connector_2.12" % "1.2.0" % Provided, + "com.crealytics" %% "spark-excel" % "3.5.0_0.20.3" % Provided, //Use by ElasticsearchClient - "com.softwaremill.sttp.client3" %% "core" % "3.8.15", - "com.softwaremill.sttp.client3" %% "json4s" % "3.8.15" exclude("org.json4s", "json4s-core_2.12"), //Exclusion because json4s is used in spark - "com.softwaremill.sttp.client3" %% "slf4j-backend" % "3.8.15", - "com.dimafeng" %% "testcontainers-scala-scalatest" % "0.40.17" % Test, - "com.dimafeng" %% "testcontainers-scala-elasticsearch" % "0.40.17" % Test, + "com.softwaremill.sttp.client3" %% "core" % "3.9.2", + "com.softwaremill.sttp.client3" %% "json4s" % "3.9.2" exclude("org.json4s", "json4s-core_2.12"), //Exclusion because json4s is used in spark + "com.softwaremill.sttp.client3" %% "slf4j-backend" % "3.9.2", + "com.dimafeng" %% "testcontainers-scala-scalatest" % "0.41.0" % Test, + "com.dimafeng" %% "testcontainers-scala-elasticsearch" % "0.41.2" % Test, "org.scalatest" %% "scalatest" % scalatestVersion % Test, "org.apache.spark" %% "spark-hive" % spark3Version % Test, ), dependencyOverrides ++= Seq( - "org.apache.commons" % "commons-lang3" % "3.9", - "org.antlr" % "antlr4-runtime" % "4.8", - "org.antlr" % "antlr4-tool" % "4.7.1", + "org.apache.commons" % "commons-lang3" % "3.12.0" ), ) .dependsOn(`datalake-commons`) diff --git a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/GenomicImplicits.scala b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/GenomicImplicits.scala index 599baa24..4aec8f8e 100644 --- a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/GenomicImplicits.scala +++ b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/GenomicImplicits.scala @@ -14,6 +14,7 @@ import scala.collection.immutable object GenomicImplicits { val log: slf4j.Logger = slf4j.LoggerFactory.getLogger(getClass.getCanonicalName) + implicit class GenomicOperations(df: DataFrame) { def joinAndMerge(other: DataFrame, outputColumnName: String, joinType: String = "inner"): DataFrame = { @@ -788,7 +789,7 @@ object GenomicImplicits { colName: String, alias: String, colType: String = "string"): Column = - (if (df.columns.contains(colName)) col(colName) else lit(null).cast(colType)).as(alias) + (if (df.columns.contains(colName)) col(colName).cast(colType).as(alias) else lit(null).cast(colType)).as(alias) //the order matters, do not change it val locusColumnNames: List[String] = List("chromosome", "start", "reference", "alternate") @@ -800,7 +801,7 @@ object GenomicImplicits { /** * Reads vcf files into dataframe and apply transformations: - * - split_multiallelics + * - optionally split multiallelics variants * - optionally normalize_variants if a path to a reference genome is given * * @param input where the vcf files are located @@ -808,7 +809,7 @@ object GenomicImplicits { * @param spark a Spark session * @return data into a dataframe */ - def vcf(input: String, referenceGenomePath: Option[String], optional: Boolean)(implicit spark: SparkSession): DataFrame = { + def vcf(input: String, referenceGenomePath: Option[String], optional: Boolean, split: Boolean)(implicit spark: SparkSession): DataFrame = { try { val inputs = input.split(",") val df = spark.read @@ -817,10 +818,10 @@ object GenomicImplicits { .load(inputs: _*) .withColumnRenamed("filters", "INFO_FILTERS") // Avoid losing filters columns before split .withColumn("alternates", col("alternateAlleles")) // Keep a copy of the list of alternate alleles after split - .withSplitMultiAllelic + val dfSplit = if(split) df.withSplitMultiAllelic else df referenceGenomePath .fold( - df.withColumn("normalizationStatus", + dfSplit.withColumn("normalizationStatus", struct( lit(false) as "changed", lit(null).cast(StringType) as "errorMessage")) @@ -835,15 +836,15 @@ object GenomicImplicits { } def vcf(input: String, referenceGenomePath: Option[String])(implicit spark: SparkSession): DataFrame = { - vcf(input, referenceGenomePath, optional = false) + vcf(input, referenceGenomePath, optional = false, split = false) } - def vcf(files: List[String], referenceGenomePath: Option[String], optional: Boolean)(implicit spark: SparkSession): DataFrame = { - vcf(files.mkString(","), referenceGenomePath, optional) + def vcf(files: List[String], referenceGenomePath: Option[String], optional: Boolean, split:Boolean)(implicit spark: SparkSession): DataFrame = { + vcf(files.mkString(","), referenceGenomePath, optional, split) } def vcf(files: List[String], referenceGenomePath: Option[String])(implicit spark: SparkSession): DataFrame = { - vcf(files.mkString(","), referenceGenomePath, optional = false) + vcf(files.mkString(","), referenceGenomePath, optional = false, split = false) } } diff --git a/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/implicits/GenomicImplicitsSpec.scala b/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/implicits/GenomicImplicitsSpec.scala index 66ac91af..2d241c61 100644 --- a/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/implicits/GenomicImplicitsSpec.scala +++ b/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/implicits/GenomicImplicitsSpec.scala @@ -855,13 +855,13 @@ class GenomicImplicitsSpec extends SparkSpec { } it should "return an empty DataFrame if optional VCF is missing" in { - val df = vcf(List("f1", "f2"), None, optional = true) + val df = vcf(List("f1", "f2"), None, optional = true, split = false) df shouldEqual spark.emptyDataFrame } it should "throw an exception if VCF is missing when not optional" in { val exception = intercept[AnalysisException] { - vcf(List("f1", "f2"), None, optional = false) + vcf(List("f1", "f2"), None, optional = false, split = false) } exception.getMessage should include("Path does not exist:") }