Skip to content

Commit

Permalink
feat(mi): add TRGT MI calculator (based on allele lengths)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidlougheed committed Jul 16, 2024
1 parent 0d55a92 commit ee3e6e2
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 2 deletions.
2 changes: 2 additions & 0 deletions strkit/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
CALLER_STRKIT_JSON = "strkit-json"
CALLER_STRKIT_VCF = "strkit-vcf"
CALLER_TANDEM_GENOTYPES = "tandem-genotypes"
CALLER_TRGT = "trgt"

CALL_SUPPORTED_CALLERS = (
CALLER_REPEATHMM,
Expand Down Expand Up @@ -58,4 +59,5 @@
CALLER_STRKIT_JSON,
CALLER_STRKIT_VCF,
CALLER_TANDEM_GENOTYPES,
CALLER_TRGT,
)
2 changes: 2 additions & 0 deletions strkit/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ def _exec_mi(p_args) -> None:
from strkit.mi.straglr import StraglrCalculator
from strkit.mi.strkit import StrKitCalculator, StrKitJSONCalculator, StrKitVCFCalculator
from strkit.mi.tandem_genotypes import TandemGenotypesCalculator
from strkit.mi.trgt import TRGTCalculator

calc_classes: dict[str, Type[BaseCalculator]] = {
c.CALLER_EXPANSIONHUNTER: ExpansionHunterCalculator,
Expand All @@ -391,6 +392,7 @@ def _exec_mi(p_args) -> None:
c.CALLER_STRKIT_JSON: StrKitJSONCalculator,
c.CALLER_STRKIT_VCF: StrKitVCFCalculator,
c.CALLER_TANDEM_GENOTYPES: TandemGenotypesCalculator,
c.CALLER_TRGT: TRGTCalculator,
}

caller = p_args.caller.lower()
Expand Down
106 changes: 106 additions & 0 deletions strkit/mi/trgt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from __future__ import annotations

import pysam

from typing import Union

from .base import BaseCalculator
from .result import MIContigResult, MILocusData
from .vcf_utils import VCFCalculatorMixin
from ..utils import parse_ci

__all__ = ["TRGTCalculator"]


def _parse_allele(a: Union[int, str, None]) -> int | None:
if isinstance(a, str):
if a == ".":
return None
return int(a)
return a


def _unzip_gt(
vals, motif_len: int
) -> Union[
tuple[tuple[float, ...], tuple[tuple[float, ...], tuple[float, ...]]],
tuple[tuple[None, None], tuple[None, None]],
]:
try:
return (
(
_parse_allele(vals[0][0]) / motif_len,
_parse_allele(vals[1][0]) / motif_len,
),
(
tuple(map(lambda x: x / motif_len, parse_ci(vals[0][1]))),
tuple(map(lambda x: x / motif_len, parse_ci(vals[1][1]))),
),
)
except ValueError:
return (None, None), (None, None)


class TRGTCalculator(BaseCalculator, VCFCalculatorMixin):
def _get_sample_contigs(self) -> tuple[set, set, set]:
return self.get_contigs_from_files(self._mother_call_file, self._father_call_file, self._child_call_file)

def calculate_contig(self, contig: str) -> MIContigResult:
cr = MIContigResult(contig, includes_95_ci=True)

mvf = pysam.VariantFile(str(self._mother_call_file))
fvf = pysam.VariantFile(str(self._father_call_file))
cvf = pysam.VariantFile(str(self._child_call_file))

# We want all common loci, so loop through the child and then look for the loci in the parent calls

for cv in cvf.fetch(contig):
mv = next(mvf.fetch(contig, cv.start, cv.stop), None)
fv = next(fvf.fetch(contig, cv.start, cv.stop), None)

# TODO: Handle sex chromosomes

k = (contig, cv.start, cv.stop)

if self.should_skip_locus(*k):
continue

cr.seen_locus(*k)

if mv is None or fv is None:
# Variant isn't found in at least one of the parents, so we can't do anything with it.
# TODO: We need to actually check calls, and check with sample ID, not just assume
continue

# TODO: Handle missing samples gracefully
# TODO: Handle wrong formatted VCFs gracefully

motif = cv.info["MOTIFS"].split(",")[0]

cs = cv.samples[self._child_id or 0]
ms = mv.samples[self._mother_id or 0]
fs = fv.samples[self._father_id or 0]

cs_reps = tuple(sorted(zip(cs["AL"].split(","), cs["ALLR"].split(",")), key=lambda x: x[0]))
ms_reps = tuple(sorted(zip(ms["AL"].split(","), ms["ALLR"].split(",")), key=lambda x: x[0]))
fs_reps = tuple(sorted(zip(fs["AL"].split(","), fs["ALLR"].split(",")), key=lambda x: x[0]))

c_gt, c_gt_95_ci = _unzip_gt(cs_reps, len(motif))
m_gt, m_gt_95_ci = _unzip_gt(ms_reps, len(motif))
f_gt, f_gt_95_ci = _unzip_gt(fs_reps, len(motif))

if c_gt[0] is None or m_gt[0] is None or f_gt[0] is None:
# None call in VCF, skip this call
continue

cr.append(MILocusData(
contig=contig,
start=cv.pos,
end=cv.stop,
motif=motif,

child_gt=c_gt, mother_gt=m_gt, father_gt=f_gt,
child_gt_95_ci=c_gt_95_ci, mother_gt_95_ci=m_gt_95_ci, father_gt_95_ci=f_gt_95_ci,
))

return cr
12 changes: 10 additions & 2 deletions strkit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"apply_or_none",
"int_tuple",
"float_tuple",
"parse_ci",
"parse_cis",
"cis_overlap",
"sign",
Expand All @@ -34,8 +35,15 @@ def float_tuple(x: Iterable) -> tuple[float, ...]:
return tuple(map(float, x))


def parse_cis(cis, commas=False, dtype=int):
return tuple(map(lambda ci: tuple(map(dtype, ci.split("," if commas else "-"))), cis))
def parse_ci(ci: str, commas=False, dtype=int) -> Union[tuple[int, int], tuple[float, float]]:
ci_s = ci.split("," if commas else "-")
return dtype(ci_s[0]), dtype(ci_s[1])


def parse_cis(
cis: Iterable[str], commas=False, dtype=int
) -> Union[tuple[tuple[int, ...], ...], tuple[tuple[float, ...], ...]]:
return tuple(map(lambda ci: parse_ci(ci, commas, dtype), cis))


def cis_overlap(ci1, ci2) -> bool:
Expand Down

0 comments on commit ee3e6e2

Please sign in to comment.