Skip to content

Commit

Permalink
Merge pull request #225 from Ferlab-Ste-Justine/bg-33-spark-3.5.1
Browse files Browse the repository at this point in the history
Spark 3.5.1
  • Loading branch information
jecos authored Mar 18, 2024
2 parents a7f7d4d + b058707 commit 8406b27
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 38 deletions.
49 changes: 22 additions & 27 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,31 @@ lazy val supportedScalaVersions = List(scala212)
scalacOptions ++= Seq("-deprecation", "-Ypartial-unification")
javacOptions ++= Seq("-source", "11", "-target", "11", "-Xlint")

val spark3Version = "3.4.2"
val catsVersion = "2.7.0"
val scalatestVersion = "3.2.12"
val pureconfigVersion = "0.17.2"
val spark3Version = "3.5.1"
val catsVersion = "2.10.0"
val scalatestVersion = "3.2.17"
val pureconfigVersion = "0.17.6"
val elasticsearchVersion = "7.15.0"
val deltaVersion = "2.4.0"
val glowVersion = "1.2.1"
val deltaVersion = "3.1.0"
val glowVersion = "2.0.0"

updateOptions := updateOptions.value.withGigahorse(false)

lazy val `datalake-commons` = (project in file("datalake-commons"))
.settings(
scalaVersion := scala212,
libraryDependencies += "com.github.pureconfig" %% "pureconfig" % pureconfigVersion,
libraryDependencies += "org.scala-lang.modules" %% "scala-collection-compat" % "2.9.0",
libraryDependencies += "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0",
libraryDependencies += "org.typelevel" %% "cats-core" % catsVersion,
libraryDependencies += "org.scalatest" %% "scalatest" % scalatestVersion,
libraryDependencies += "io.projectglow" %% "glow-spark3" % glowVersion % Provided exclude("org.apache.hadoop", "hadoop-client"),
libraryDependencies += "com.outr" %% "hasher" % "1.2.2",
libraryDependencies += "com.lihaoyi"%% "mainargs" % "0.5.0",
libraryDependencies += "com.lihaoyi"%% "mainargs" % "0.6.1",
libraryDependencies += "org.apache.spark" %% "spark-core" % spark3Version % Provided,
libraryDependencies += "org.apache.spark" %% "spark-sql" % spark3Version % Provided,
dependencyOverrides ++= Seq(
"org.apache.commons" % "commons-lang3" % "3.9",
"org.antlr" % "antlr4" % "4.8",
"org.antlr" % "antlr4-tool" % "4.8",
"org.antlr" % "antlr4-runtime" % "4.8"
)
"org.apache.commons" % "commons-lang3" % "3.12.0"
)
)

lazy val `datalake-test-utils` = (project in file("datalake-test-utils"))
Expand All @@ -46,7 +43,7 @@ lazy val `datalake-test-utils` = (project in file("datalake-test-utils"))
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % spark3Version % Provided,
"org.apache.spark" %% "spark-sql" % spark3Version % Provided,
"io.delta" %% "delta-core" % deltaVersion % Provided,
"io.delta" %% "delta-spark" % deltaVersion % Provided,
"org.scalatest" %% "scalatest" % scalatestVersion % Test
)
)
Expand All @@ -58,29 +55,27 @@ lazy val `datalake-spark3` = (project in file("datalake-spark3"))
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % spark3Version % Provided,
"org.apache.spark" %% "spark-sql" % spark3Version % Provided,
"io.delta" %% "delta-core" % deltaVersion % Provided,
"io.delta" %% "delta-spark" % deltaVersion % Provided,
"org.elasticsearch" %% "elasticsearch-spark-30" % elasticsearchVersion % Provided,
"com.github.pureconfig" %% "pureconfig" % pureconfigVersion,
"org.typelevel" %% "cats-core" % catsVersion,
"io.projectglow" %% "glow-spark3" % glowVersion % Provided exclude("org.apache.hadoop", "hadoop-client"),
"com.microsoft.sqlserver" % "mssql-jdbc" % "8.4.1.jre8" % Provided,
"io.projectglow" %% "glow-spark3" % glowVersion % Provided exclude("org.apache.hadoop", "hadoop-client") exclude("io.netty","netty-all") exclude("io.netty","netty-handler") exclude("io.netty","netty-transport-native-epoll"),
"com.microsoft.sqlserver" % "mssql-jdbc" % "8.4.1.jre11" % Provided,
"com.microsoft.aad" % "adal4j" % "0.0.2" % Provided,
"com.microsoft.azure" % "spark-mssql-connector_2.12" % "1.1.0" % Provided,
"com.crealytics" %% "spark-excel" % s"${spark3Version}_0.20.3" % Provided,
"com.microsoft.azure" % "spark-mssql-connector_2.12" % "1.2.0" % Provided,
"com.crealytics" %% "spark-excel" % "3.5.0_0.20.3" % Provided,
//Use by ElasticsearchClient
"com.softwaremill.sttp.client3" %% "core" % "3.8.15",
"com.softwaremill.sttp.client3" %% "json4s" % "3.8.15" exclude("org.json4s", "json4s-core_2.12"), //Exclusion because json4s is used in spark
"com.softwaremill.sttp.client3" %% "slf4j-backend" % "3.8.15",
"com.dimafeng" %% "testcontainers-scala-scalatest" % "0.40.17" % Test,
"com.dimafeng" %% "testcontainers-scala-elasticsearch" % "0.40.17" % Test,
"com.softwaremill.sttp.client3" %% "core" % "3.9.2",
"com.softwaremill.sttp.client3" %% "json4s" % "3.9.2" exclude("org.json4s", "json4s-core_2.12"), //Exclusion because json4s is used in spark
"com.softwaremill.sttp.client3" %% "slf4j-backend" % "3.9.2",
"com.dimafeng" %% "testcontainers-scala-scalatest" % "0.41.0" % Test,
"com.dimafeng" %% "testcontainers-scala-elasticsearch" % "0.41.2" % Test,
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
"org.apache.spark" %% "spark-hive" % spark3Version % Test,

),
dependencyOverrides ++= Seq(
"org.apache.commons" % "commons-lang3" % "3.9",
"org.antlr" % "antlr4-runtime" % "4.8",
"org.antlr" % "antlr4-tool" % "4.7.1",
"org.apache.commons" % "commons-lang3" % "3.12.0"
),
)
.dependsOn(`datalake-commons`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import scala.collection.immutable

object GenomicImplicits {
val log: slf4j.Logger = slf4j.LoggerFactory.getLogger(getClass.getCanonicalName)

implicit class GenomicOperations(df: DataFrame) {

def joinAndMerge(other: DataFrame, outputColumnName: String, joinType: String = "inner"): DataFrame = {
Expand Down Expand Up @@ -788,7 +789,7 @@ object GenomicImplicits {
colName: String,
alias: String,
colType: String = "string"): Column =
(if (df.columns.contains(colName)) col(colName) else lit(null).cast(colType)).as(alias)
(if (df.columns.contains(colName)) col(colName).cast(colType).as(alias) else lit(null).cast(colType)).as(alias)

//the order matters, do not change it
val locusColumnNames: List[String] = List("chromosome", "start", "reference", "alternate")
Expand All @@ -800,15 +801,15 @@ object GenomicImplicits {

/**
* Reads vcf files into dataframe and apply transformations:
* - split_multiallelics
* - optionally split multiallelics variants
* - optionally normalize_variants if a path to a reference genome is given
*
* @param input where the vcf files are located
* @param referenceGenomePath reference genome path. This path has to be local for each executors
* @param spark a Spark session
* @return data into a dataframe
*/
def vcf(input: String, referenceGenomePath: Option[String], optional: Boolean)(implicit spark: SparkSession): DataFrame = {
def vcf(input: String, referenceGenomePath: Option[String], optional: Boolean, split: Boolean)(implicit spark: SparkSession): DataFrame = {
try {
val inputs = input.split(",")
val df = spark.read
Expand All @@ -817,10 +818,10 @@ object GenomicImplicits {
.load(inputs: _*)
.withColumnRenamed("filters", "INFO_FILTERS") // Avoid losing filters columns before split
.withColumn("alternates", col("alternateAlleles")) // Keep a copy of the list of alternate alleles after split
.withSplitMultiAllelic
val dfSplit = if(split) df.withSplitMultiAllelic else df
referenceGenomePath
.fold(
df.withColumn("normalizationStatus",
dfSplit.withColumn("normalizationStatus",
struct(
lit(false) as "changed",
lit(null).cast(StringType) as "errorMessage"))
Expand All @@ -835,15 +836,15 @@ object GenomicImplicits {
}

def vcf(input: String, referenceGenomePath: Option[String])(implicit spark: SparkSession): DataFrame = {
vcf(input, referenceGenomePath, optional = false)
vcf(input, referenceGenomePath, optional = false, split = false)
}

def vcf(files: List[String], referenceGenomePath: Option[String], optional: Boolean)(implicit spark: SparkSession): DataFrame = {
vcf(files.mkString(","), referenceGenomePath, optional)
def vcf(files: List[String], referenceGenomePath: Option[String], optional: Boolean, split:Boolean)(implicit spark: SparkSession): DataFrame = {
vcf(files.mkString(","), referenceGenomePath, optional, split)
}

def vcf(files: List[String], referenceGenomePath: Option[String])(implicit spark: SparkSession): DataFrame = {
vcf(files.mkString(","), referenceGenomePath, optional = false)
vcf(files.mkString(","), referenceGenomePath, optional = false, split = false)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,13 +855,13 @@ class GenomicImplicitsSpec extends SparkSpec {
}

it should "return an empty DataFrame if optional VCF is missing" in {
val df = vcf(List("f1", "f2"), None, optional = true)
val df = vcf(List("f1", "f2"), None, optional = true, split = false)
df shouldEqual spark.emptyDataFrame
}

it should "throw an exception if VCF is missing when not optional" in {
val exception = intercept[AnalysisException] {
vcf(List("f1", "f2"), None, optional = false)
vcf(List("f1", "f2"), None, optional = false, split = false)
}
exception.getMessage should include("Path does not exist:")
}
Expand Down

0 comments on commit 8406b27

Please sign in to comment.