-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added files for protvec representation
- Loading branch information
1 parent
2ba33c8
commit f39cb1f
Showing
27 changed files
with
499 additions
and
0 deletions.
There are no files selected for viewing
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
#! /usr/bin/python | ||
|
||
# -*- coding: utf-8 -*- | ||
""" | ||
Created on Sun Nov 22 17:12:34 2015 | ||
@authors: Kaveh Karbasi & Ehsaneddin Asgari | ||
""" | ||
import sys | ||
sys.path.append('../') | ||
import os.path | ||
import timeit | ||
import re | ||
|
||
|
||
class PWAlignment: | ||
def __init__(self, seq1, seq2, seq_type): | ||
''' | ||
the class constructor gets: | ||
(1) the first and (2)the second sequence and (3)the scoring matrix | ||
''' | ||
# Characters score matrix , gap penalty , input characters | ||
# this method sets the self.scores, self.characters,self.gap_pnlty of this class | ||
if seq_type == 'dna': | ||
self.create_scores_dict("../config/dna_dist.txt") | ||
elif seq_type == 'rna': | ||
self.create_scores_dict("../config/rna_dist.txt") | ||
elif seq_type == 'protein': | ||
self.create_scores_dict("../config/protein_dist.txt") | ||
else: | ||
print("Not a valid type") | ||
exit() | ||
# Sequences | ||
self.seq1 = seq1.upper() | ||
self.seq2 = seq2.upper() | ||
|
||
# check validity | ||
if not (self.checkSeqValidity(self.seq1) or self.checkSeqValidity(self.seq2)): | ||
print("Characters in the sequence does not match the scoring matrix") | ||
exit() | ||
|
||
# Alignment score matrix | ||
self.matrix = [[[0, 0] for i in self.seq1 + '1'] for j in self.seq2 + '1']; | ||
# Calculate the alignmet score matrix | ||
self.calc_matrix(); | ||
|
||
def create_scores_dict(self, mat_file_path): | ||
''' | ||
Creates a dictionary of scores of all pair characters | ||
returns the pair character score dictionary along with gap penalty and | ||
allowable characters | ||
''' | ||
# open the score matrix file | ||
infile = open(mat_file_path) | ||
matData = infile.read(); | ||
# init the score matrix valuse | ||
self.scores = {} | ||
lines = matData.split("\n") | ||
# detemining the characters in the scoring matrix | ||
self.characters = lines[0].split(); | ||
|
||
# ******* Error handing | ||
# check if the header does have any numerical character | ||
if (any(i.isdigit() for i in lines[0])): | ||
print("Incorrect format detected in the scoring matrix:\n ** no numerical character is allowed") | ||
exit() | ||
|
||
N = len(self.characters); | ||
# ******* Error handing | ||
# check if the number of lines is consistent with the number of characters | ||
if not len(lines) == N + 2: | ||
print("Incorrect format detected in the scoring matrix :\n ** # of lines doesn't match the character set") | ||
exit() | ||
# setting the score matrix values | ||
for lindex, line in enumerate(lines[1:-1]): | ||
try: | ||
vals = [int(x) for x in line.split()]; | ||
except: | ||
print("Incorrect format detected in the scoring matrix in line: " + str( | ||
lindex + 2) + ":\n ** only numerical value is allowed") | ||
exit() | ||
# ******* Error handing | ||
# detecting the inconsistency between # of char and the int in each row of matrix | ||
if not (len(vals) == N): | ||
print("Incorrect format detected in the scoring matrix in line: " + str(lindex + 2)) | ||
exit() | ||
for cindex, char in enumerate(self.characters): | ||
self.scores["".join([self.characters[lindex], char])] = vals[cindex] | ||
|
||
# parsing the gap penalty | ||
# ******* Error handing | ||
try: | ||
vals = [int(x) for x in lines[-1].split()]; | ||
except: | ||
print("Incorrect format detected in the scoring matrix: \n ** incorrect gap penalty: a single number ") | ||
exit() | ||
# ******* Error handing | ||
# if more than one value is specified | ||
if not (len(vals) == 1): | ||
print("Incorrect format detected in the scoring matrix: \n ** incorrect gap penalty: a single number") | ||
exit() | ||
self.gap_pnlty = int(vals[0]) | ||
|
||
def checkSeqValidity(self, sequence): | ||
''' | ||
This method checks if the sequence read from the fasta file | ||
matches the characters specified in the scoring matrix | ||
''' | ||
# compares if the characters in the sequence are are subset of chars in characters | ||
if set(sequence) <= set(self.characters): | ||
return True; | ||
else: | ||
print(set(sequence)) | ||
return False; | ||
|
||
def calc_matrix(self): | ||
''' | ||
Calculates the alignmet score matrix | ||
dynamic programming | ||
''' | ||
J = range(len(self.matrix[0])) | ||
I = range(len(self.matrix)) | ||
|
||
# initialization of the first column and the first row | ||
for i in I: | ||
self.matrix[i][0][0] = self.gap_pnlty * i | ||
self.matrix[i][0][1] = 2 | ||
for j in J: | ||
self.matrix[0][j][0] = self.gap_pnlty * j | ||
self.matrix[0][j][1] = 1 | ||
|
||
# following the dynamic programming rules for pairwise alignment | ||
for i in I[1:]: | ||
for j in J[1:]: | ||
# we need to calculate three options and calculate the optimum to assign to the current cell | ||
Match = self.matrix[i - 1][j - 1][0] + self.scores["".join([self.seq1[j - 1], self.seq2[i - 1]])] | ||
Insert = self.matrix[i][j - 1][0] + self.gap_pnlty | ||
Delete = self.matrix[i - 1][j][0] + self.gap_pnlty | ||
# 0 is diagonal, 1 is horizantal, 2 is vertical | ||
pathList = [Match, Insert, Delete] | ||
# assign the best value | ||
self.matrix[i][j][0] = max(pathList) | ||
# keep the pointer to the previous cell | ||
self.matrix[i][j][1] = pathList.index(self.matrix[i][j][0]); | ||
|
||
def getScoreMatrix(self): | ||
''' | ||
Alignmet score matrix getter | ||
''' | ||
return self.matrix; | ||
|
||
def getAlignScore(self): | ||
''' | ||
Alignmet score matrix getter | ||
''' | ||
return self.matrix[-1][-1][0]; | ||
|
||
def printAlignment(self): | ||
''' | ||
This function iteratively go from the buttom of the table | ||
to the head to find the aligned sequences and finally | ||
print the aligned sequences in chunks of 80 characters | ||
''' | ||
J = len(self.matrix[0]) - 1 | ||
I = len(self.matrix) - 1 | ||
# aligned version of the first sequence | ||
s1 = "" | ||
# aligned version of the second sequence | ||
s2 = "" | ||
# in this loop we start from the final score and track the | ||
# path to the starting point and incrementally build the aligned sequences | ||
# in s1 and s2 | ||
|
||
while not (I == 0 and J == 0): | ||
# find the previous cell | ||
alignPath = self.matrix[I][J][1]; | ||
# Previous cell is the diagonal cell | ||
if alignPath == 0: | ||
# no gap penalty | ||
# update the aligned sequences | ||
s1 = self.seq1[J - 1] + s1 | ||
s2 = self.seq2[I - 1] + s2 | ||
# update the pointer to the current cell | ||
I = I - 1 | ||
J = J - 1 | ||
# Previous cell is on the left | ||
elif alignPath == 1: | ||
# update the aligned sequences | ||
s1 = self.seq1[J - 1] + s1 | ||
s2 = '-' + s2 | ||
# update the pointer to the current cell | ||
J = J - 1 | ||
# previous cell is on the top of the current | ||
else: | ||
# update the aligned sequences | ||
s1 = '-' + s1 | ||
s2 = self.seq2[I - 1] + s2 | ||
# update the pointer to the current cell | ||
I = I - 1 | ||
# now we need to print them in chunks of 80 | ||
alignment_output = "" | ||
s1_chunks = [s1[i:i + 80] for i in range(0, len(s1), 80)] | ||
s2_chunks = [s2[i:i + 80] for i in range(0, len(s1), 80)] | ||
chunk_num = 0; | ||
# preparing the alignment output by adding pair of alignment in the length of 80 | ||
for s1_chunk in s1_chunks: | ||
alignment_output += s1_chunk + "\n" | ||
alignment_output += s2_chunks[chunk_num] + "\n\n" | ||
chunk_num = chunk_num + 1 | ||
# We print everything once to be efficient in time | ||
print(alignment_output.rstrip()) | ||
|
||
# Print the scoring matrix | ||
# it prints every lines together to save time | ||
def printScoreMatrix(self): | ||
row_num = 0; | ||
# first row | ||
matrix_out = "|*|*|" + ('|'.join(self.seq1)) + "|\n" | ||
# the second sequence in on the vertical header | ||
ver_header = "*" + self.seq2; | ||
for row in self.matrix: | ||
# getting the scorse for the current row | ||
s = [i[0] for i in row] | ||
# adding the fist colums | ||
s.insert(0, '|' + ver_header[row_num]) | ||
row_num += 1 | ||
# join othe columns | ||
matrix_out += ('|'.join([str(i) for i in s])) + "|\n" | ||
# print the scoring matrix | ||
print(matrix_out) |
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added
BIN
+5.11 KB
make_representations/__pycache__/sequencelist_representation.cpython-35.pyc
Binary file not shown.
Binary file added
BIN
+4.81 KB
make_representations/__pycache__/sequencelist_representation.cpython-36.pyc
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
__author__ = "Ehsaneddin Asgari" | ||
__license__ = "GPL" | ||
__version__ = "1.0.0" | ||
__maintainer__ = "Ehsaneddin Asgari" | ||
__email__ = "[email protected] or [email protected]" | ||
__project__ = "LLP - Life Language Processing" | ||
__website__ = "https://llp.berkeley.edu/" | ||
|
||
from alignment.pairwise_align import PWAlignment | ||
from sklearn.feature_extraction.text import TfidfVectorizer | ||
import itertools | ||
from gensim.models import KeyedVectors | ||
from scipy.sparse import csc_matrix | ||
import numpy as np | ||
from utility.file_utility import FileUtility | ||
|
||
|
||
class SequenceKmerRep(object): | ||
def __init__(self, sequences, seq_type, k_mer, restricted_kmer=True, use_idf=False, norm=None, delete_empty_col=False): | ||
''' | ||
Class constructor | ||
''' | ||
self.seq_type = seq_type | ||
self.sequences = sequences | ||
self.k_mer = k_mer | ||
self.restricted_kmer = restricted_kmer | ||
self.delete_empty_col = delete_empty_col | ||
self.set_tfidf_vect(use_idf=use_idf, norm=norm) | ||
self.set_tfidf_representation() | ||
|
||
def set_tfidf_vect(self, use_idf=False, norm=None): | ||
''' | ||
:param use_idf: whether to use idf or not | ||
:param norm: whether to normalize or not | ||
:return: | ||
''' | ||
if self.restricted_kmer: | ||
if self.seq_type == 'protein': | ||
self.vocab = [''.join(xs) for xs in itertools.product( | ||
'arndcqeghilkmfpstwyvbzxuo', repeat=self.k_mer)] | ||
if self.seq_type == 'dna': | ||
self.vocab = [''.join(xs) for xs in itertools.product( | ||
'atcg', repeat=self.k_mer)] | ||
if self.seq_type == 'rna': | ||
self.vocab = [''.join(xs) for xs in itertools.product( | ||
'aucg', repeat=self.k_mer)] | ||
self.vocab.sort() | ||
self.vectorizer = TfidfVectorizer(use_idf=use_idf, vocabulary=self.vocab, analyzer='char', | ||
ngram_range=( | ||
self.k_mer, self.k_mer), | ||
norm=norm, stop_words=[], lowercase=True, binary=False) | ||
else: | ||
self.vectorizer = TfidfVectorizer(use_idf=use_idf, analyzer='char', ngram_range=(self.k_mer, self.k_mer), | ||
norm=norm, stop_words=[], lowercase=True, binary=False) | ||
|
||
def set_tfidf_representation(self): | ||
''' | ||
set the representation | ||
:return: | ||
''' | ||
self.X = self.vectorizer.fit_transform(self.sequences) | ||
self.vocab = self.vectorizer.get_feature_names() | ||
if self.delete_empty_col: | ||
del_col = np.where(np.sum(self.X.toarray(), axis=0) == 0)[0] | ||
self.X = self.X.toarray() | ||
self.X = np.delete(self.X, del_col, axis=1) | ||
self.X = csc_matrix(self.X) | ||
self.vocab = [v for i, v in enumerate( | ||
self.vocab) if i not in list(del_col)] | ||
|
||
def get_representation(self, seq): | ||
''' | ||
:param seq: representation for an extra sequence | ||
:return: | ||
''' | ||
return self.vectorizer.fit_transform([seq]).toarray() | ||
|
||
|
||
class SequenceKmerEmbRep(SequenceKmerRep): | ||
def __init__(self, embedding_file, sequences, seq_type, k_mer, restricted_kmer=True, use_idf=False, norm=None): | ||
''' | ||
Class constructor | ||
''' | ||
SequenceKmerRep.__init__(self, sequences, seq_type, k_mer, restricted_kmer=restricted_kmer, use_idf=use_idf, | ||
norm=norm, delete_empty_col=True) | ||
self.model = KeyedVectors.load_word2vec_format( | ||
embedding_file, binary=False) | ||
model = self.model | ||
try: | ||
k_mer_dict = FileUtility.load_obj( | ||
'../config/' + str(k_mer) + "_in_model") | ||
except: | ||
k_mer_dict = dict() | ||
new_words = [x.lower() for x in (self.vocab) if x.upper() not in model] | ||
for w in new_words: | ||
if w not in k_mer_dict: | ||
k_mer_dict[w] = self.closest_kmer_in_model(w) | ||
FileUtility.save_obj('../config/' + str(k_mer) + | ||
"_in_model", k_mer_dict) | ||
# produce embedding mapping | ||
self.emb_trans = [self.model[x.upper()] if x.upper( | ||
) in self.model else self.model[k_mer_dict[x]] for x in self.vocab] | ||
# summation vector | ||
self.embeddingX = self.X.dot(self.emb_trans) | ||
|
||
def closest_kmer_in_model(self, k_mer): | ||
''' | ||
Look for the closest k-mer | ||
:param k_mer: | ||
:return: | ||
''' | ||
k_mer = k_mer.upper() | ||
value = -1000 | ||
closest = '' | ||
for w in self.model.index2word: | ||
if '<' not in w: | ||
PWA = PWAlignment(k_mer, w, self.seq_type) | ||
if PWA.getAlignScore() > value: | ||
value = PWA.getAlignScore() | ||
closest = w | ||
return closest |
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
__author__ = "Ehsaneddin Asgari" | ||
__license__ = "GPL" | ||
__version__ = "1.0.0" | ||
__maintainer__ = "Ehsaneddin Asgari" | ||
__email__ = "[email protected] or [email protected]" | ||
__project__ = "LLP - Life Language Processing" | ||
__website__ = "https://llp.berkeley.edu/" | ||
|
||
|
||
from sklearn.feature_extraction.text import TfidfVectorizer | ||
|
||
class TextFeature(object): | ||
''' | ||
This class is to create feature matrix | ||
''' | ||
def __init__(self, corpus, analyzer='word', ngram=(1,1), idf=False, norm=None, binary=False): | ||
tfm = TfidfVectorizer(use_idf=idf, analyzer=analyzer, tokenizer=str.split, ngram_range=ngram, norm=norm, stop_words=[], lowercase=False, binary=binary) | ||
self.tf_vec = tfm.fit_transform(corpus) | ||
self.feature_names = tfm.get_feature_names() |
Oops, something went wrong.