Skip to content

Commit

Permalink
Merge pull request #245 from Ferlab-Ste-Justine/feat/clin-2536
Browse files Browse the repository at this point in the history
feat: CLIN-2536 normalize gnomad 4
  • Loading branch information
meek0 authored Dec 3, 2024
2 parents 3462b55 + 57e6312 commit 4ccef86
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 15 deletions.
20 changes: 20 additions & 0 deletions datalake-spark3/src/main/resources/reference_kf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,26 @@ datalake {
"valid_to_column"="valid_to"
}
},
{
format=VCF
id="raw_gnomad_genomes_v4"
keys=[]
loadtype=OverWrite
partitionby=[]
path="/release/4.1/vcf/genomes/gnomad.genomes.v4.1.sites.chr[^M]*.vcf.bgz"
readoptions {
flattenInfoFields="true"
"split_multiallelics"="true"
}
storageid=gnomadv4
writeoptions {
"created_on_column"="created_on"
"is_current_column"="is_current"
"updated_on_column"="updated_on"
"valid_from_column"="valid_from"
"valid_to_column"="valid_to"
}
},
{
format=VCF
id="raw_gnomad_genomes_v3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ object ImportPublicTable {
@main
def gnomadv3(rc: RuntimeETLContext): Unit = GnomadV3.run(rc)

@main
def gnomadv4(rc: RuntimeETLContext): Unit = GnomadV4.run(rc)

@main
def gnomad_constraint(rc: RuntimeETLContext): Unit = GnomadConstraint.run(rc)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ case class PublicDatasets(alias: String, tableDatabase: Option[String], viewData
DatasetConf("raw_clinvar" , alias, "/raw/landing/clinvar/clinvar.vcf.gz" , VCF , OverWrite , readoptions = Map("flattenInfoFields" -> "true", "split_multiallelics" -> "true")),
DatasetConf("raw_dbsnp" , alias, "/raw/landing/dbsnp/GCF_000001405.40.gz" , VCF , OverWrite , readoptions = Map("flattenInfoFields" -> "true", "split_multiallelics" -> "true")),
DatasetConf("raw_gnomad_genomes_v3" , alias, "/release/3.1/vcf/genomes/gnomad.genomes.v3.1.sites.chr[^M]*.vcf.bgz", VCF , OverWrite , readoptions = Map("flattenInfoFields" -> "true", "split_multiallelics" -> "true")).copy(storageid = gnomadStorageId),
DatasetConf("raw_gnomad_genomes_v4" , alias, "/release/4.1/vcf/genomes/gnomad.genomes.v4.1.sites.chr[^M]*.vcf.bgz", VCF , OverWrite , readoptions = Map("flattenInfoFields" -> "true", "split_multiallelics" -> "true")).copy(storageid = gnomadStorageId),
DatasetConf("raw_gnomad_constraint_v2_1_1" , alias, "/raw/landing/gnomad_v2_1_1/gnomad.v2.1.1.lof_metrics.by_gene.txt.gz", CSV , OverWrite , readoptions = Map("header" -> "true", "sep" -> "\t")),
DatasetConf("raw_topmed_bravo" , alias, "/raw/landing/topmed/bravo-dbsnp-*.vcf.gz" , VCF , OverWrite , readoptions = Map("flattenInfoFields" -> "true", "split_multiallelics" -> "true")),
DatasetConf("raw_1000_genomes" , alias, "/raw/landing/1000Genomes/ALL.*.sites.vcf.gz" , VCF , OverWrite , readoptions = Map("flattenInfoFields" -> "true", "split_multiallelics" -> "true")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,6 @@ case class GnomadV3(rc: RuntimeETLContext) extends SimpleETLP(rc) {
)
}

private def flattenInfo(df: DataFrame): Seq[Column] = {
val replaceColumnName: String => String = name => name.replace("INFO_", "").toLowerCase

df.schema.toList.collect {
case c
if (c.name.startsWith("INFO_AN") ||
c.name.startsWith("INFO_AC") ||
c.name.startsWith("INFO_AF") ||
c.name.startsWith("INFO_nhomalt")) && c.dataType.isInstanceOf[ArrayType] =>
col(c.name)(0) as replaceColumnName(c.name)
case c if c.name.startsWith("INFO_") =>
col(c.name) as replaceColumnName(c.name)
}
}

override val defaultRepartition: DataFrame => DataFrame = RepartitionByRange(columnNames = Seq("chromosome", "start"), n = Some(1000))


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package bio.ferlab.datalake.spark3.publictables.normalized.gnomad

import bio.ferlab.datalake.commons.config.{DatasetConf, RepartitionByRange, RuntimeETLContext}
import bio.ferlab.datalake.spark3.etl.v4.SimpleETLP
import bio.ferlab.datalake.spark3.implicits.DatasetConfImplicits._
import bio.ferlab.datalake.spark3.implicits.GenomicImplicits.columns._
import mainargs.{ParserForMethods, main}
import org.apache.spark.sql.DataFrame

import java.time.LocalDateTime

case class GnomadV4(rc: RuntimeETLContext) extends SimpleETLP(rc) {

override val mainDestination: DatasetConf = conf.getDataset("normalized_gnomad_genomes_v4")
val gnomad_vcf: DatasetConf = conf.getDataset("raw_gnomad_genomes_v4")

override def extract(lastRunValue: LocalDateTime = minValue,
currentRunValue: LocalDateTime = LocalDateTime.now()): Map[String, DataFrame] = {
Map(gnomad_vcf.id -> gnomad_vcf.read)
}

override def transformSingle(data: Map[String, DataFrame],
lastRunValue: LocalDateTime = minValue,
currentRunValue: LocalDateTime = LocalDateTime.now()): DataFrame = {
import spark.implicits._

val df = data(gnomad_vcf.id)

val intermediate = df
.select(
chromosome +:
start +:
end +:
reference +:
alternate +:
$"qual" +:
name +:
flattenInfo(df): _*
)

intermediate.select(
$"chromosome",
$"start",
$"end",
$"reference",
$"alternate",
$"qual",
$"name",
$"ac".cast("long"),
$"af",
$"an".cast("long"),
$"nhomalt".cast("long") as "hom"
)
}

override val defaultRepartition: DataFrame => DataFrame = RepartitionByRange(columnNames = Seq("chromosome", "start"), n = Some(1000))

}

object GnomadV4 {
@main
def run(rc: RuntimeETLContext): Unit = {
GnomadV4(rc).run()
}

def main(args: Array[String]): Unit = ParserForMethods(this).runOrThrow(args)
}
20 changes: 20 additions & 0 deletions datalake-spark3/src/test/resources/config/reference_kf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,26 @@ datalake {
"valid_to_column"="valid_to"
}
},
{
format=VCF
id="raw_gnomad_genomes_v4"
keys=[]
loadtype=OverWrite
partitionby=[]
path="/release/4.1/vcf/genomes/gnomad.genomes.v4.1.sites.chr[^M]*.vcf.bgz"
readoptions {
flattenInfoFields="true"
"split_multiallelics"="true"
}
storageid=gnomadv4
writeoptions {
"created_on_column"="created_on"
"is_current_column"="is_current"
"updated_on_column"="updated_on"
"valid_from_column"="valid_from"
"valid_to_column"="valid_to"
}
},
{
format=VCF
id="raw_gnomad_genomes_v3"
Expand Down
Loading

0 comments on commit 4ccef86

Please sign in to comment.