-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutils.py
138 lines (125 loc) · 5.12 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/python
import os,sys
# from Bio import SeqIO
import numpy as np
import pandas as pd
import re
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix, matthews_corrcoef
from sklearn.metrics import roc_curve, auc
from sklearn.metrics import accuracy_score, f1_score
from keras import backend as K
import tensorflow as tf
np.random.seed(12345)
def get_num(fasta_name, fasta_name2):
num = 0
for seq_record in SeqIO.parse(fasta_name,"fasta"):
if(not(re.search('N',str(seq_record.seq.upper())))):
num+=1
for seq_record2 in SeqIO.parse(fasta_name2,"fasta"):
if(not(re.search('N',str(seq_record2.seq.upper())))):
num+=1
return num
def get_pred_num(fasta_name):
num = 0
for seq_record in SeqIO.parse(fasta_name,"fasta"):
if(not(re.search('N',str(seq_record.seq)))):
num+=1
return num
'''Convert the input sequences into binary matrixs'''
def get_seq_matrix(fasta_name,seqmatrix,rank):
labellist = []
for seq_record in SeqIO.parse(fasta_name,"fasta"):
label = seq_record.id
sequence = seq_record.seq.upper()
if(re.search('N',str(sequence))):
continue
Acode = np.array(get_code(sequence,'A'),dtype=int)
Ucode = np.array(get_code(sequence,'U'),dtype=int)
Gcode = np.array(get_code(sequence,'G'),dtype=int)
Ccode = np.array(get_code(sequence,'C'),dtype=int)
seqcode = np.vstack((Acode,Ucode,Gcode,Ccode))
labellist.append(label)
seqmatrix[rank] = seqcode
rank +=1
return seqmatrix,labellist,rank
def get_code(seq,nt):
nts = ['A','U','G','C']
nts.remove(nt)
codes = str(seq).replace(nt,str(1))
for i in range(0,len(nts)):
codes = codes.replace(nts[i],str(0))
coding = list(codes)
for i in range(0,len(coding)):
coding[i] = float(coding[i])
return coding
'''Get the train, validation and test set from the input'''
def get_data(infile,infile2):
rank = 0
num = get_num(infile,infile2)
seqmatrix = np.zeros((num,4,201))
(seqmatrix, poslab, rank) = get_seq_matrix(infile,seqmatrix,rank)
(seqmatrix, neglab, rank) = get_seq_matrix(infile2,seqmatrix,rank)
X = seqmatrix
Y = np.array(poslab + neglab,dtype = int)
validation_size = 0.20
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=validation_size)
X_train, X_val, Y_train, Y_val = train_test_split(X_train, Y_train, test_size=validation_size)
return np.transpose(X_train,axes=(0,2,1)), np.transpose(X_val,axes=(0,2,1)), np.transpose(X_test,axes=(0,2,1)), Y_train, Y_val, Y_test
def get_test_data(infile,infile2):
rank = 0
num = get_num(infile,infile2)
seqmatrix = np.zeros((num,4,201))
(seqmatrix, poslab, rank) = get_seq_matrix(infile,seqmatrix,rank)
(seqmatrix, neglab, rank) = get_seq_matrix(infile2,seqmatrix,rank)
X = seqmatrix
Y = np.array(poslab + neglab,dtype = int)
return np.transpose(X,axes=(0,2,1)), Y
def get_pred_data(infile):
rank = 0
num = get_pred_num(infile)
seqmatrix = np.zeros((num,4,201))
(seqmatrix, gene, rank) = get_seq_matrix(infile,seqmatrix,rank)
X = seqmatrix
Y = np.array(gene,dtype = str)
return np.transpose(X,axes=(0,2,1)), Y
'''Calculate ROC AUC during model training, obtained from <https://github.com/nathanshartmann/NILC-at-CWI-2018>'''
def binary_PFA(y_true, y_pred, threshold=K.variable(value=0.5)):
y_pred = K.cast(y_pred >= threshold, 'float32')
N = K.sum(1 - y_true)
FP = K.sum(y_pred - y_pred * y_true)
return FP/N
def binary_PTA(y_true, y_pred, threshold=K.variable(value=0.5)):
y_pred = K.cast(y_pred >= threshold, 'float32')
P = K.sum(y_true)
TP = K.sum(y_pred * y_true)
return TP/P
def roc_auc(y_true, y_pred):
ptas = tf.stack([binary_PTA(y_true,y_pred,k) for k in np.linspace(0, 1, 1000)],axis=0)
pfas = tf.stack([binary_PFA(y_true,y_pred,k) for k in np.linspace(0, 1, 1000)],axis=0)
pfas = tf.concat([tf.ones((1,)) ,pfas],axis=0)
binSizes = -(pfas[1:]-pfas[:-1])
s = ptas*binSizes
return K.sum(s, axis=0)
'''Calculate the performance metrics'''
def metrics(Y_test, rounded, pred_train_prob, fileout):
confusion = confusion_matrix(Y_test, rounded)
TP = confusion[1, 1]
TN = confusion[0, 0]
FP = confusion[0, 1]
FN = confusion[1, 0]
sepcificity = TN / float( TN + FP)
sensitivity = TP / float(FN + TP)
mcc = matthews_corrcoef(Y_test, rounded)
f1 = f1_score(Y_test, rounded)
fpr, tpr, thresholds = roc_curve(Y_test, pred_train_prob)
aucvalue = auc(fpr, tpr)
fileout.write("Prediction:\n Accuracy: "+str(accuracy_score(Y_test, rounded))+"\n")
fileout.write("Sepcificity: "+str(sepcificity)+"\n")
fileout.write("Sensitivity: "+str(sensitivity)+"\n")
fileout.write("MCC: "+str(mcc)+"\n")
fileout.write("Fscore: "+str(f1)+"\n")
fileout.write("AUC: "+str(aucvalue)+"\n")
fileout.write("confusion matrix:"+str(confusion_matrix(Y_test, rounded))+"\n")
fileout.write("Classification report:"+str(classification_report(Y_test, rounded))+"\n")