Skip to content

Commit

Permalink
re-writing the classification alleles
Browse files Browse the repository at this point in the history
  • Loading branch information
luissian committed Mar 27, 2024
1 parent 5899055 commit eb22786
Showing 1 changed file with 157 additions and 122 deletions.
279 changes: 157 additions & 122 deletions taranis/allele_calling.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from collections import OrderedDict
from pathlib import Path
from Bio import SeqIO
import pdb

log = logging.getLogger(__name__)
stderr = rich.console.Console(
Expand Down Expand Up @@ -62,7 +63,7 @@ def __init__(
self.aligment_request = aligment_request

def assign_allele_type(
self, blast_result: list, allele_file: str, allele_name: str
self, blast_results: list, allele_file: str, allele_name: str
) -> list:
"""Assign allele type to the allele
Expand All @@ -72,20 +73,77 @@ def assign_allele_type(
allele_name (str): allele name
Returns:
list: containing allele classification, allele name and allele details
list: containing allele classification, allele match id, and allele
details
"""

def get_blast_details(blast_result: list, allele_name: str) -> list:
def check_if_plot(column_blast_res: list) -> bool:
"""Check if allele is partial length
Args:
column_blast_res (list): blast result
Returns:
bool: True if allele is partial length
"""
if (
column_blast_res[9] == "1" # check at contig start
# check if contig ends is the same as match allele ends
or column_blast_res[15] == column_blast_res[10]
or column_blast_res[10] == "1" # check reverse at contig end
# check if contig start is the same as match allele start reverse
or column_blast_res[9] == column_blast_res[15]
):
return True
return False

def discard_low_threshold_results(blast_results: list) -> list:
"""Discard blast results with lower threshold
Args:
blast_results (list): blast results
Returns:
list: blast results with higher query size
"""
valid_blast_result = []
for b_result in blast_results:
blast_split = b_result.split("\t")
# check if the division of the match contig length by the
# reference allele length is higher than the threshold
# pdb.set_trace()
if (int(blast_split[4]) / int(blast_split[3])) >= self.threshold:
valid_blast_result.append(b_result)
return valid_blast_result

def get_blast_details(blast_result: str, allele_name: str) -> list:
"""Collect blast details and modify the order of the columns
Args:
blast_result (list): information collected by running blast
blast_result (str): information collected by running blast
allele_name (str): allele name
Returns:
list: containing allele details in the correct order to be saved
blast_details[0] = sample name
blast_details[1] = contig name
blast_details[2] = core gene name
blast_details[3] = allele gene
blast_details[4] = coding allele type
blast_details[5] = reference allele length
blast_details[6] = match alignment length
blast_details[7] = contig length
blast_details[8] = match contig position start
blast_details[9] = match contig position end
blast_details[10] = direction
blast_details[11] = gene annotation
blast_details[12] = product annotation
blast_details[13] = allele quality
blast_details[14] = match sequence in contig
blast_details[15] = reference allele sequence
"""
match_allele_name = blast_result[0]
split_blast_result = blast_result.split("\t")
match_allele_name = split_blast_result[0]
try:
gene_annotation = self.prediction_data[match_allele_name]["gene"]
product_annotation = self.prediction_data[match_allele_name]["product"]
Expand All @@ -96,142 +154,115 @@ def get_blast_details(blast_result: list, allele_name: str) -> list:
gene_annotation = "Not found"
product_annotation = "Not found"
allele_quality = "Not found"
if int(blast_result[10]) > int(blast_result[9]):
# pdb.set_trace()
if int(split_blast_result[10]) > int(split_blast_result[9]):
direction = "+"
else:
direction = "-"
# get blast details
blast_details = [
self.s_name, # sample name
blast_result[1], # contig
split_blast_result[1], # contig name
allele_name, # core gene name
blast_result[0], # allele gene
split_blast_result[0], # allele gene
"coding", # coding allele type. To be filled later idx = 4
blast_result[3], # reference allele length
blast_result[4], # alignment length
blast_result[14], # contig length
blast_result[9], # contig start
blast_result[10], # contig end
split_blast_result[3], # reference allele length
split_blast_result[4], # match alignment length
split_blast_result[15], # contig length
split_blast_result[9], # match contig position start
split_blast_result[10], # match contig position end
direction,
gene_annotation,
product_annotation,
allele_quality,
blast_result[13], # contig sequence
split_blast_result[13], # match sequence in contig
split_blast_result[15], # reference allele sequence
]

# pdb.set_trace()
return blast_details

valid_blast_result = []
match_full_length = 0
match_partial_length = 0
for b_result in blast_result:
column_blast_res = b_result.split("\t")
query_length = int(column_blast_res[4]) / int(column_blast_res[3])
if query_length >= self.threshold:
valid_blast_result.append(b_result)
if query_length == 1:
match_full_length += 1
else:
match_partial_length += 1

if len(valid_blast_result) > 1:
# allele could be named as NIPHEM or NIPH
multi_allele = []
for valid_result in valid_blast_result:
column_blast_res = valid_result.split("\t")
column_blast_res[13] = column_blast_res[13].replace("-", "")
allele_details = get_blast_details(column_blast_res, allele_name)
if match_full_length >= 2:
# labled as NIPHEM if all alleles are in the same contig
allele_details[4] = "NIPHEM_" + allele_details[3]
classification = "NIPHEM"
else:
# labled as NIPH if all alleles are in different contigs
allele_details[4] = "NIPH_" + allele_details[3]
classification = "NIPH"
multi_allele.append(allele_details)
return [classification, allele_name, multi_allele]
def find_match_allele_schema(allele_file: str, match_sequence: str) -> str:
"""Find the allele name in the schema that match the sequence
elif len(valid_blast_result) == 1:
column_blast_res = blast_result[0].split("\t")
column_blast_res[13] = column_blast_res[13].replace("-", "")
allele_details = get_blast_details(column_blast_res, allele_name)
Args:
allele_file (str): file with allele sequences
match_sequence (str): sequence to be matched
Returns:
str: allele name in the schema that match the sequence
"""
grep_result = taranis.utils.grep_execution(
allele_file, column_blast_res[13], "-b1"
allele_file, match_sequence, "-b1"
)
# check if sequence match alleles in schema
if len(grep_result) > 0:
allele_name = grep_result[0].split(">")[1]

# allele is labled as EXACT
allele_details[4] = "EXC_" + allele_details[3]
return ["EXC", allele_name, allele_details]
# check if contig is shorter than allele
if int(column_blast_res[3]) > int(get_blast_detailscolumn_blast_res[4]):
# check if sequence is shorter because it starts or ends at the contig
if (
column_blast_res[9] == "1" # check at contig start
or column_blast_res[14]
== column_blast_res[10] # check at contig end
or column_blast_res[10] == "1" # check reverse at contig end
or column_blast_res[9]
== column_blast_res[14] # check reverse at contig start
):
# allele is labled as PLOT
allele_details[4] = "PLOT_" + allele_details[3]
return ["PLOT", allele_details[3], allele_details]
# allele is labled as ASM
allele_details[4] = "ASM_" + allele_details[3]
return ["ASM", allele_details[3], allele_details]
# check if contig is longer than allele
if int(column_blast_res[3]) < int(column_blast_res[4]):
# allele is labled as ALM
allele_details[4] = "ALM_" + allele_details[3]
return ["ALM", allele_details[3], allele_details]
if int(column_blast_res[3]) == int(column_blast_res[4]):
# allele is labled as INF

allele_details[3] = (
allele_name
+ "_"
+ str(
self.inf_alle_obj.get_inferred_allele(
column_blast_res[13], allele_name
)
return grep_result[0].split("_")[1]
return ""

valid_blast_results = discard_low_threshold_results(blast_results)
match_allele_schema = ""
if len(valid_blast_results) == 0:
# no match results labelled as LNF. details data filled with empty data
return ["LNF", "-", ["-," * 15]]
if len(valid_blast_results) > 1:
# could be NIPHEM or NIPH
b_split_data = []
match_allele_seq = []
for valid_blast_result in valid_blast_results:
multi_allele_data = get_blast_details(valid_blast_result, allele_name)
# get match allele sequence
match_allele_seq.append(multi_allele_data[14])
b_split_data.append(multi_allele_data)
# check if match allele is in schema
if match_allele_schema == "":
# find the allele in schema with the match sequence in the contig
match_allele_schema = find_match_allele_schema(
allele_file, multi_allele_data[14]
)
)
allele_details[4] = "INF_" + allele_details[3]
return ["INF", allele_details[3], allele_details]
if len(set(match_allele_seq)) == 1:
# all sequuences are equal labelled as NIPHEM
classification = "NIPHEM"
else:
# some of the sequences are different labelled as NIPH
classification = "NIPH"
# update coding allele type
for (idx,) in range(len(b_split_data)):
b_split_data[idx][4] = classification + "_" + match_allele_schema
else:
# analyze again the blast result to check with lower query size, 0.75
# it starts/ends at the contig. Then it is labled as PLOT

multi_allele = []
classification = ""
for b_result in blast_result:
column_blast_res = b_result.split("\t")
query_length = int(column_blast_res[4]) / int(column_blast_res[3])
if query_length >= 0.75:
if (
column_blast_res[9] == "1" # check at contig start
or column_blast_res[14]
== column_blast_res[10] # check at contig end
or column_blast_res[10] == "1" # check reverse at contig end
or column_blast_res[9]
== column_blast_res[14] # check reverse at contig start
):
allele_details = get_blast_details(
column_blast_res, allele_name
)
# allele is labled as PLOT
allele_details[4] = "PLOT_" + allele_details[3]
multi_allele.append(allele_details)
classification = "PLOT"
if classification == "PLOT":
return [classification, allele_details[4], multi_allele]
b_split_data = get_blast_details(valid_blast_results[0], allele_name)
# found the allele in schema with the match sequence in the contig
match_allele_schema = find_match_allele_schema(
allele_file, b_split_data[14]
)
# PLOT, ASM, ALM, INF, EXC are possible classifications
if check_if_plot(b_split_data):
# match allele is partial length labelled as PLOT
classification = "PLOT"

# check if match allele is shorter than reference allele
elif int(b_split_data[5]) < int(b_split_data[6]):
classification = "ASM"
# check if match allele is longer than reference allele
elif int(b_split_data[5]) > int(b_split_data[6]):
classification = "ALM"
else:
return ["LNF", "-", "LNF"]
# if sequence was not found after running grep labelled as INF
if match_allele_schema == "":
classification = "INF"
else:
# exact match found labelled as EXC
classification = "EXC"
# assign an identification value to the new allele
if match_allele_schema == "":
match_allele_schema = str(
self.inf_alle_obj.get_inferred_allele(b_split_data[14], allele_name)
)
# pdb.set_trace()
b_split_data[4] = classification + "_" + match_allele_schema
return [
classification,
classification + "_" + match_allele_schema,
b_split_data,
]

def search_match_allele(self):
# Create blast db with sample file
Expand Down Expand Up @@ -315,6 +346,7 @@ def parallel_execution(
schema: str,
prediction_data: dict,
reference_alleles: list,
threshold: float,
out_folder: str,
inf_alle_obj: object,
snp_request: bool = False,
Expand All @@ -325,6 +357,7 @@ def parallel_execution(
schema,
prediction_data,
reference_alleles,
threshold,
out_folder,
inf_alle_obj,
snp_request,
Expand Down Expand Up @@ -377,10 +410,10 @@ def stats_graphics(stats_folder: str, summary_result: dict) -> None:
"sample",
"contig",
"core gene",
"allele name",
"reference allele name",
"codification",
"query lenght",
"match lengt",
"query length",
"match length",
"contig length",
"contig start",
"contig stop",
Expand All @@ -395,6 +428,7 @@ def stats_graphics(stats_folder: str, summary_result: dict) -> None:
sample_allele_match = {} # used for allele match file

# get allele list
# pdb.set_trace()
first_sample = list(results[0].keys())[0]
allele_list = sorted(results[0][first_sample]["allele_type"].keys())
for result in results:
Expand All @@ -408,7 +442,8 @@ def stats_graphics(stats_folder: str, summary_result: dict) -> None:
sum_allele_type[type_of_allele] += 1
# add allele name match to sample
allele_match[allele] = (
type_of_allele + "_" + values["allele_match"][allele]
# type_of_allele + "_" + values["allele_match"][allele]
values["allele_match"][allele]
)
summary_result[sample] = sum_allele_type
sample_allele_match[sample] = allele_match
Expand Down

0 comments on commit eb22786

Please sign in to comment.