Skip to content

Commit

Permalink
feat(mi): initial implementation of mutation-from-parent
Browse files Browse the repository at this point in the history
  • Loading branch information
davidlougheed committed Oct 7, 2024
1 parent 05a1611 commit 884d568
Showing 1 changed file with 96 additions and 14 deletions.
110 changes: 96 additions & 14 deletions strkit/mi/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from strkit.logger import get_main_logger
from strkit.utils import cat_strs, cis_overlap

from typing import Generator, Iterable, Optional, Union
from typing import Generator, Iterable, Literal, Optional, Union

__all__ = [
"MILocusData",
Expand All @@ -26,7 +26,18 @@

OptionalReadCounts = Optional[tuple[tuple[int, ...], tuple[int, ...]]]

INHERITANCE_CONFIGS = (
ParentInheritanceConfig = tuple[Literal[0, 1], Literal[0, 1]]
InheritanceConfig = tuple[Literal[0, 1], Literal[0, 1], Literal[0, 1], Literal[0, 1]]

# Only the parental parts of the below INHERITANCE_CONFIGS constant - which alleles were inherited from each parent.
PARENT_INHERITANCE_CONFIGS: tuple[ParentInheritanceConfig, ...] = (
(0, 0), # maternal 0 allele, paternal 0 allele
(0, 1), # maternal 0 allele, paternal 1 allele
(1, 0), # maternal 1 allele, paternal 0 allele
(1, 1), # maternal 1 allele, paternal 1 allele
)

INHERITANCE_CONFIGS: tuple[InheritanceConfig, ...] = (
(0, 1, 0, 0), # allele 0 maternal, allele 1 paternal, maternal 0 allele, paternal 0 allele
(0, 1, 0, 1), # allele 0 maternal, allele 1 paternal, maternal 0 allele, paternal 1 allele
(0, 1, 1, 0), # allele 0 maternal, allele 1 paternal, maternal 1 allele, paternal 0 allele
Expand All @@ -38,6 +49,8 @@
(1, 0, 1, 1), # allele 1 maternal, allele 0 paternal, maternal 1 allele, paternal 1 allele
)

MutationFrom = Literal["none", "?", "mat", "pat", "both"]


class MILocusData:
def __init__(
Expand All @@ -60,6 +73,7 @@ def __init__(
widen: float = 0,

test_to_perform: str = "none",
sig_level: float = 0.05,

logger: Optional[logging.Logger] = None,
):
Expand Down Expand Up @@ -90,10 +104,14 @@ def __init__(
self._decimal_threshold: float = 0.5
self._widen: float = widen

# --- begin de novo mutation-related fields/initialization ---
self._p_value = None
self._adj_p_value = None
if test_to_perform != "none":
self._p_value = self.de_novo_test(test_to_perform)
self._most_likely_config: Optional[ParentInheritanceConfig] = None
self._mutation_from: MutationFrom = "none"
if test_to_perform != "none" and (de_novo_res := self.de_novo_test(test_to_perform, sig_level)):
self._p_value, self._most_likely_config, self._mutation_from = de_novo_res
# --- end de novo mutation-related fields/initialization ---

self._logger = logger or get_main_logger()

Expand Down Expand Up @@ -186,6 +204,14 @@ def adj_p_value(self, value: Optional[float]):
raise e
self._adj_p_value = value

@property
def most_likely_inheritance_config(self) -> Optional[InheritanceConfig]:
return self._most_likely_config

@property
def mutation_from(self) -> MutationFrom:
return self._mutation_from

@staticmethod
def _respects_strict_ci(c_gt, m_gt, f_gt) -> bool:
# First hypothesis: first allele from mother, second from father
Expand Down Expand Up @@ -251,6 +277,17 @@ def _respects_mi_ci(self, c_gt_ci, m_gt_ci, f_gt_ci, widen: float) -> Optional[b
self._logger.error(f"Encountered invalid child confidence intervals: {c_gt_ci} ({e})")
return None

@staticmethod
def _mutation_from_mat_pat_p_vals(mat_p: float, pat_p: float, unadj_sig_level: float) -> MutationFrom:
if mat_p < unadj_sig_level and pat_p < unadj_sig_level:
return "both"
elif mat_p < unadj_sig_level:
return "mat"
elif pat_p < unadj_sig_level:
return "pat"
else:
return "?"

def respects_mi(self, widen: Optional[float] = None) -> tuple[bool, bool, Optional[bool], Optional[bool]]:
fn = self._respects_decimal_ci if self._decimal else MILocusData._respects_strict_ci
respects_mi_strict = fn(self._child_gt, self._mother_gt, self._father_gt)
Expand All @@ -272,17 +309,27 @@ def respects_mi(self, widen: Optional[float] = None) -> tuple[bool, bool, Option

return respects_mi_strict, respects_mi_pm1, respects_mi_95_ci, respects_mi_99_ci

def de_novo_test(self, test: str) -> Optional[float]:
test_res = []
def de_novo_test(
self, test: str, unadj_sig_level: float
) -> Optional[tuple[float, ParentInheritanceConfig, MutationFrom]]:
most_likely_p_val: float = 0.0
most_likely_config: ParentInheritanceConfig = PARENT_INHERITANCE_CONFIGS[0]

cr_all = self._child_read_counts_flattened
cr_all_set = set(cr_all)

for config in INHERITANCE_CONFIGS[:4]: # Don't need child allele configs here
for config in PARENT_INHERITANCE_CONFIGS: # Don't need child allele configs here currently
# TODO: I'm pretty sure this can be more numpy-ified / sped up

mat_reads = self._mother_read_counts[config[2]]
pat_reads = self._father_read_counts[config[3]]
# TODO: split alleles, do all configs, and generate two p-values instead of one? or separately determine
# which allele comes from where?
# Proposed flow: 1) have two p-values per locus; probability of being drawn from the same dist for allele 0
# and for allele 1 - multiply together to determine 'most likely' (sketchy statistics)
# Then, after multiple testing correction, any p-values below the threshold are used to determine of
# there's a mutation from maternal, paternal, or both, right before output.

mat_reads = self._mother_read_counts[config[0]]
pat_reads = self._father_read_counts[config[1]]

parent_all = np.array((*mat_reads, *pat_reads))

Expand Down Expand Up @@ -330,14 +377,48 @@ def de_novo_test(self, test: str) -> Optional[float]:
# Same-variance requirements should hold almost all the time, unless there is suspected mosaicism
# (and then X2 should be used instead)
p_val = sst.mannwhitneyu(
cr_all,
parent_all,
cr_all,
alternative="greater" if test == "wmw-gt" else "two-sided",
use_continuity=False)[1]

test_res.append(float(p_val)) # cast to make sure we're in native float type
p_val_f = float(p_val) # cast to make sure we're in native float type

if p_val_f > most_likely_p_val:
most_likely_p_val = p_val_f
most_likely_config = config

# Figure out where mutation is probably from, if any (multiple testing correction will be applied later, so we
# will report a "mutation_from" which may not be significant in the end.)
mutation_from: MutationFrom = "none"

if most_likely_p_val < unadj_sig_level:
mat_reads = self._mother_read_counts[most_likely_config[0]]
pat_reads = self._father_read_counts[most_likely_config[1]]

mat_dist_config_1 = np.abs(np.mean(self._child_read_counts[0]) - np.mean(mat_reads))
pat_dist_config_1 = np.abs(np.mean(self._child_read_counts[1]) - np.mean(pat_reads))
dist_config_1 = mat_dist_config_1 + pat_dist_config_1

mat_dist_config_2 = np.abs(np.mean(self._child_read_counts[1]) - np.mean(mat_reads))
pat_dist_config_2 = np.abs(np.mean(self._child_read_counts[0]) - np.mean(pat_reads))
dist_config_2 = mat_dist_config_2 + pat_dist_config_2

if dist_config_2 > dist_config_1:
# Choose configuration 1, since it minimizes distance
mat_test = sst.mannwhitneyu(mat_reads, self._child_read_counts[0], use_continuity=False)[1]
pat_test = sst.mannwhitneyu(pat_reads, self._child_read_counts[1], use_continuity=False)[1]
mutation_from = self._mutation_from_mat_pat_p_vals(mat_test, pat_test, unadj_sig_level)
elif dist_config_1 > dist_config_2:
# Choose configuration 2, since it minimizes distance
mat_test = sst.mannwhitneyu(mat_reads, self._child_read_counts[1], use_continuity=False)[1]
pat_test = sst.mannwhitneyu(pat_reads, self._child_read_counts[0], use_continuity=False)[1]
mutation_from = self._mutation_from_mat_pat_p_vals(mat_test, pat_test, unadj_sig_level)
else:
# no difference between distances to guess mutation origin configuration
mutation_from = "?"

return max(test_res)
return most_likely_p_val, most_likely_config, mutation_from

def __iter__(self): # for dict casting
yield "contig", self._contig
Expand All @@ -356,8 +437,9 @@ def __iter__(self): # for dict casting

if self._p_value:
yield "p", self._p_value
if self._adj_p_value:
yield "p_adj", self._adj_p_value
yield "mutation_from", self._mutation_from
if self._adj_p_value:
yield "p_adj", self._adj_p_value

def __str__(self):
def _opt_ci_str(ci_str, level="95"):
Expand Down

0 comments on commit 884d568

Please sign in to comment.