Skip to content

Commit

Permalink
documented GnocchiSession (bigdatagenomics#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
kunalgosar authored and nathanielparke committed Oct 24, 2017
1 parent 1637917 commit 76020f8
Showing 1 changed file with 58 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,29 @@ object GnocchiSession {
new GnocchiSession(sc)
}

/**
* The GnocchiSession provides functions on top of a SparkContext for loading and
* analyzing genome data.
*
* @param sc The SparkContext to wrap.
*/
class GnocchiSession(@transient val sc: SparkContext) extends Serializable with Logging {

val sparkSession = SparkSession.builder().getOrCreate()
import sparkSession.implicits._

/**
* Returns a filtered Dataset of CalledVariant objects, where all values with
* fewer samples than the mind threshold are filtered out.
*
* @param genotypes The Dataset of CalledVariant objects to filter on
* @param mind The percentage threshold of samples to have filled in; values
* with fewer samples will be removed in this operation.
* @param ploidy The number of sets of chromosomes
*
* @return Returns an updated Dataset with values removed, as specified by the
* filtering
*/
def filterSamples(genotypes: Dataset[CalledVariant], mind: Double, ploidy: Double): Dataset[CalledVariant] = {
require(mind >= 0.0 && mind <= 1.0, "`mind` value must be between 0.0 to 1.0 inclusive.")
val sampleIds = genotypes.first.samples.map(x => x.sampleID)
Expand All @@ -50,6 +68,19 @@ class GnocchiSession(@transient val sc: SparkContext) extends Serializable with
genotypes.drop("samples").join(filteredDF, "uniqueID").as[CalledVariant]
}

/**
* Returns a filtered Dataset of CalledVariant objects, where all variants with
* values less than the specified geno or maf threshold are filtered out.
*
* @param genotypes The Dataset of CalledVariant objects to filter on
* @param geno The percentage threshold for geno values for each CalledVariant
* object
* @param maf The percentage threshold for Minor Allele Frequency for each
* CalledVariant object
*
* @return Returns an updated Dataset with values removed, as specified by the
* filtering
*/
def filterVariants(genotypes: Dataset[CalledVariant], geno: Double, maf: Double): Dataset[CalledVariant] = {
require(maf >= 0.0 && maf <= 1.0, "`maf` value must be between 0.0 to 1.0 inclusive.")
require(geno >= 0.0 && geno <= 1.0, "`geno` value must be between 0.0 to 1.0 inclusive.")
Expand All @@ -62,6 +93,15 @@ class GnocchiSession(@transient val sc: SparkContext) extends Serializable with
mafFiltered
}

/**
* Returns a modified Dataset of CalledVariant objects, where any value with a
* maf > 0.5 is recoded. The recoding is specified as flipping the referenceAllele
* and alternateAllele when the frequency of alt is greater than that of ref.
*
* @param genotypes The Dataset of CalledVariant objects to recode
*
* @return Returns an updated Dataset that has been recoded
*/
def recodeMajorAllele(genotypes: Dataset[CalledVariant]): Dataset[CalledVariant] = {
val minorAlleleF = genotypes.map(x => (x.uniqueID, x.maf)).toDF("uniqueID", "maf")
val genoWithMaf = genotypes.join(minorAlleleF, "uniqueID")
Expand Down Expand Up @@ -117,6 +157,23 @@ class GnocchiSession(@transient val sc: SparkContext) extends Serializable with
}).toDS
}

/**
* Returns a map of phenotype name to phenotype object, which is loaded from
* a file, specified by phenotypesPath
*
* @param phenotypesPath A string specifying the location in the file system
* of the phenotypes file to load in.
* @param primaryID The primary sample ID
* @param phenoName The primary phenotype
* @param delimiter The delimiter used in the input file
* @param covarPath Optional parameter specifying the location in the file
* system of the covariants file
* @param covarNames Optional paramter specifying the names of the covariants
* detailed in the covariants file
* @param covarDelimiter The delimiter used in the covariants file
*
* @return A Map of phenotype name to Phenotype object
*/
def loadPhenotypes(phenotypesPath: String,
primaryID: String,
phenoName: String,
Expand Down Expand Up @@ -184,4 +241,4 @@ class GnocchiSession(@transient val sc: SparkContext) extends Serializable with

phenoCovarDF.withColumn("phenoName", lit(phenoName)).as[Phenotype].collect().map(x => (x.sampleId, x)).toMap
}
}
}

0 comments on commit 76020f8

Please sign in to comment.