Skip to content

Commit

Permalink
Merge pull request #260 from Ferlab-Ste-Justine/fix/clin-2536
Browse files Browse the repository at this point in the history
fix: CLIN-2536 flattenInfo to manage arrayType
  • Loading branch information
meek0 authored Jan 13, 2025
2 parents 0a41e24 + eff9d8a commit 2f04ec4
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import bio.ferlab.datalake.spark3.implicits.SparkUtils.isNestedFieldExists
import io.projectglow.Glow
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, StringType}
import org.apache.spark.sql.types.{ArrayType, DoubleType, StringType}
import org.apache.spark.sql._
import org.slf4j

Expand Down Expand Up @@ -646,10 +646,15 @@ object GenomicImplicits {

val dp: Column = col("INFO_DP") as "dp"

def flattenInfo(df: DataFrame, except: String*): Seq[Column] =
df.columns.filterNot(except.contains(_)).collect {
case c if c.startsWith("INFO_") => col(c)(0) as c.replace("INFO_", "").toLowerCase
def flattenInfo(df: DataFrame, except: String*): Seq[Column] = {
df.columns.collect { case c if !except.contains(c) && c.startsWith("INFO_") =>
val newName = c.stripPrefix("INFO_").toLowerCase
df.schema(c).dataType match {
case _: ArrayType => col(c)(0).as(newName)
case _ => col(c).as(newName)
}
}
}


def familyInfo(cols: Seq[Column] = Seq(col("calls"), col("affected_status"), col("gq")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package bio.ferlab.datalake.spark3.publictables.normalized.gnomad

import bio.ferlab.datalake.commons.config.{DatasetConf, RepartitionByRange, RuntimeETLContext}
import bio.ferlab.datalake.spark3.etl.v4.SimpleETLP
import bio.ferlab.datalake.spark3.implicits.DatasetConfImplicits._
import bio.ferlab.datalake.spark3.implicits.GenomicImplicits.columns._
import io.projectglow.Glow
import bio.ferlab.datalake.spark3.implicits.GenomicImplicits.vcf
import mainargs.{ParserForMethods, main}
import org.apache.spark.sql.DataFrame

Expand All @@ -17,8 +16,8 @@ case class GnomadV4(rc: RuntimeETLContext) extends SimpleETLP(rc) {

override def extract(lastRunValue: LocalDateTime = minValue,
currentRunValue: LocalDateTime = LocalDateTime.now()): Map[String, DataFrame] = {
val sess = Glow.register(spark)
Map(gnomad_vcf.id -> sess.read.format("vcf").load(gnomad_vcf.location))

Map(gnomad_vcf.id -> vcf(gnomad_vcf.location, None))
}

override def transformSingle(data: Map[String, DataFrame],
Expand All @@ -31,13 +30,13 @@ case class GnomadV4(rc: RuntimeETLContext) extends SimpleETLP(rc) {
val intermediate = df
.select(
chromosome +:
start +:
end +:
reference +:
alternate +:
$"qual" +:
name +:
flattenInfo(df): _*
start +:
end +:
reference +:
alternate +:
$"qual" +:
name +:
flattenInfo(df): _*
)

intermediate.select(
Expand Down

0 comments on commit 2f04ec4

Please sign in to comment.