Skip to content

Commit

Permalink
Merge branch 'master' into kmc_issue15 #15
Browse files Browse the repository at this point in the history
  • Loading branch information
IsaacT1123 committed Apr 24, 2020
2 parents abc4f9c + bf5b125 commit 7fc2e86
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 266 deletions.
227 changes: 120 additions & 107 deletions CMash/GroundTruth.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import khmer
import marisa_trie as mt
import numpy as np
import os
import sys
import pandas as pd
from hydra import WritingBloomFilter, ReadingBloomFilter
from scipy.sparse import csc_matrix
import re
import screed
from argparse import ArgumentTypeError
import multiprocessing
import argparse
import subprocess
import json
from itertools import starmap
import tempfile

# The following is for ease of development (so I don't need to keep re-installing the tool)
try:
Expand All @@ -35,17 +36,21 @@
class TrueContainment:
"""
This class has functionality to compute the ground truth containment indicies and return them in the same format
as the scripts (to ease future testing). It is **only** intended for:
1. Small training databases
as the scripts (to ease future testing). It is only intended for:
1. Small-ish training databases
2. Databases that were formed using genomes that you have direct access to (i.e. live on your file system)
"""

def __init__(self, training_database_file: str, k_sizes: str):
def __init__(self, training_database_file: str, k_sizes: str, temp_dir: str):
self.training_database_file = training_database_file
self.k_sizes = self.__parseNumList(k_sizes)
self.CEs = self.__import_database()
self.training_file_names = self.__return_file_names()
self.training_file_to_ksize_to_kmers = self.__compute_all_training_kmers()
self.temp_dir = temp_dir
if not os.path.exists(temp_dir):
os.mkdir(temp_dir)
# compute all the training k-mers up front
self._compute_all_training_kmers()

def __import_database(self) -> list:
"""
Expand Down Expand Up @@ -88,95 +93,96 @@ def __parseNumList(k_sizes_str: str) -> list:
return list(range(start, end + 1, increment))

@staticmethod
def __kmers(seq, ksize):
def _kmc_count(input_file_name: str, output_file_name: str, kmer_size: int, threads=2) -> None:
"""
yield all k-mers of len ksize from seq.
Returns an iterable object
:param seq: a DNA sequence
:type seq: str
:param ksize: a k-mer size
:type ksize: int
Calls KMC to compute the k-mers for a given input file name
:param input_file_name:
:type input_file_name:
:param output_file_name:
:type output_file_name:
:param kmer_size:
:type kmer_size:
"""
for i in range(len(seq) - ksize + 1):
yield seq[i:i + ksize]
input_types = ['-fm', '-fq', '-fa', '-fbam']
success = False
for input_type in input_types:
res = subprocess.run(f"kmc -k{kmer_size} {input_type} -r -t{threads} -ci0 -cs3 -j{output_file_name}.log {input_file_name} {output_file_name} .", shell=True, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)
if res.returncode == 0:
success = True
break
if not success:
raise Exception(f"Unknown sequence format: must be one of multifasta, fastq, fasta, or BAM (gzipped or uncompressed). Culprit file is {input_file_name}. Command was {res.args}")

def _return_ksize_to_kmers(self, input_file: str) -> dict:
@staticmethod
def _kmc_return_distinct_kmers(kmc_log_file: str) -> int:
"""
Enumerates all the k-mers specified by self.k_sizes in the genome/metagenome specified by input_file.
:param input_file: a file path to a fna/fq/gzipped file containing DNA sequences
:type input_file: str
:return: a dictionary with keys corresponding to k-mer sizes in self.k_sizes, and values dictionaries containing canonical k-mers
:rtype: dict
Parses the KMC log file to return the number of distinct k-mers
:param kmc_log_file:
:type kmc_log_file:
:return:
:rtype:
"""
k_sizes = self.k_sizes
k_size_to_kmers = dict()
# initialize all the k-mer sizes for the query file
for k_size in k_sizes:
k_size_to_kmers[k_size] = set()

# iterate over the file only once
for record in screed.open(input_file):
seq = record.sequence # get the sequence
seq = seq.upper() # convert to upper-case
seq_split_onlyACTG = notACTG.split(seq) # split it on non-ACTG sequences
# if there are no non-ACTG's we get only a single one
if len(seq_split_onlyACTG) == 1:
for k_size in k_sizes:
for kmer in self.__kmers(seq, k_size):
if kmer:
# Use canonical k-mers
temp_kmer = kmer
temp_kmer_rc = khmer.reverse_complement(kmer)
if temp_kmer < temp_kmer_rc:
k_size_to_kmers[k_size].add(temp_kmer) # add the kmer
else:
k_size_to_kmers[k_size].add(temp_kmer_rc) # add the reverse complement
# otherwise, we need to do the same thing for each of the subsequences
else:
for sub_seq in seq_split_onlyACTG:
if sub_seq:
for k_size in k_sizes:
for kmer in self.__kmers(seq, k_size):
if kmer:
# Use canonical k-mers
temp_kmer = kmer
temp_kmer_rc = khmer.reverse_complement(kmer)
if temp_kmer < temp_kmer_rc:
k_size_to_kmers[k_size].add(temp_kmer) # add the kmer
else:
k_size_to_kmers[k_size].add(temp_kmer_rc) # add the reverse complement
return k_size_to_kmers
with open(kmc_log_file, 'r') as fid:
res = json.load(fid)
return res['Stats']['#Unique_k-mers']

@staticmethod
def __return_containment_index(set1: set, set2: set):
def _kmc_return_intersection_count(kmc_input_file1: str, kmc_input_file2: str) -> int:
"""
Computes the containment index
:param set1: a set of k-mers
:type set1: set
:param set2: another set of k-mers
:type set2: set
:return: containment index |set1 \cap set2| / |set 1|
:rtype: float
Takes two kmc counted files, returns the number of k-mers in their intersection
:param kmc_input_file1:
:type kmc_input_file1:
:param kmc_input_file2:
:type kmc_input_file2:
"""
return len(set1.intersection(set2)) / float(len(set1))
dir_name = os.path.dirname(kmc_input_file1)
intersect_file = os.path.join(dir_name, f"{os.path.basename(kmc_input_file1)}_intersect_{os.path.basename(kmc_input_file2)}")
dump_file = os.path.join(dir_name, f"{os.path.basename(kmc_input_file1)}_intersect_{os.path.basename(kmc_input_file2)}_dump")
res = subprocess.run(f"kmc_tools simple {kmc_input_file1} -ci1 {kmc_input_file2} -ci1 intersect {intersect_file}", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if res.returncode != 0:
raise Exception(f"The command {res.args} failed to run and returned the returncode={res.returncode}")
res = subprocess.run(f"kmc_dump {intersect_file} {dump_file}; cat {dump_file} | wc -l", shell=True, capture_output=True)
if res.returncode != 0:
raise Exception(f"The command {res.args} failed to run and returned the returncode={res.returncode}")

def __compute_all_training_kmers(self):
"""
In a parallelized fashion, enumerate all the k-mers for the given self.k_sizes in the input training genomes.
:return: a dictionary with keys given by self.training_file_names, values are dictionaries: keys are k_sizes, values are sets of canonical k-mers
:rtype: dict
"""
training_file_to_ksize_to_kmers = dict()
num_threads = multiprocessing.cpu_count()
intersect_count = int(res.stdout)

# then clean up the mess
os.remove(f"{intersect_file}.kmc_pre")
os.remove(f"{intersect_file}.kmc_suf")
os.remove(dump_file)

return intersect_count

def __kmc_output_name_converter(self, input_file: str, k_size: str) -> str:
temp_dir = self.temp_dir
return f"{os.path.join(temp_dir, os.path.basename(input_file))}_k_{k_size}"

def _compute_all_training_kmers(self):
num_threads = int(multiprocessing.cpu_count()/float(2))
to_compute = []
# create the tuples to be computed on: (input file, ouput_kmc_file, k_kmer_size)
for training_file in self.training_file_names:
for k_size in self.k_sizes:
output_file = self.__kmc_output_name_converter(training_file, k_size)
to_compute.append((training_file, output_file, k_size))
pool = multiprocessing.Pool(processes=int(min(num_threads, len(self.training_file_names))))
# res is returned in the same order as self.training_file_names according to the docs
res = pool.map(self._return_ksize_to_kmers, self.training_file_names)
for (item, file_name) in zip(res, self.training_file_names):
training_file_to_ksize_to_kmers[file_name] = item
res = pool.starmap(self._kmc_count, to_compute)
# consume everything so we know the process has completed
for it in res:
pass
pool.close()
return training_file_to_ksize_to_kmers

def __return_containment_indicies(self, query_file: str) -> np.ndarray:
def _return_containment_index(self, query_file: str, i: int, j: int) -> tuple:
k_size = self.k_sizes[j]
training_file = self.training_file_names[i]
training_kmc_output = self.__kmc_output_name_converter(training_file, k_size)
query_kmc_output = self.__kmc_output_name_converter(query_file, k_size)
numerator = self._kmc_return_intersection_count(query_kmc_output, training_kmc_output)
denomenator = self._kmc_return_distinct_kmers(f"{training_kmc_output}.log")
return (i, j, numerator / float(denomenator)) # | train \cap query| / | train |

def _return_containment_indicies(self, query_file: str) -> np.ndarray:
"""
Creates a matrix of containment indicies:
for each i in self.training_file_names:
Expand All @@ -189,27 +195,35 @@ def __return_containment_indicies(self, query_file: str) -> np.ndarray:
"""
training_file_names = self.training_file_names
k_sizes = self.k_sizes
training_file_to_ksize_to_kmers = self.training_file_to_ksize_to_kmers
num_files = len(training_file_names)
# compute the k-mers in the query file
for k_size in k_sizes:
# store the query file kmc outputs to a dict for future convenience
self._kmc_count(query_file, self.__kmc_output_name_converter(query_file, k_size), k_size, threads=multiprocessing.cpu_count())

# compute the containment indicies
# rows are the files, columns are the k-mer sizes
containment_indicies = np.zeros((num_files, len(k_sizes)))
# if the query file is part of the training files, then nothing extra to do
if query_file in training_file_names:
for (j, k_size) in enumerate(k_sizes):
query_kmers = training_file_to_ksize_to_kmers[query_file][k_size]
for (i, file_name) in enumerate(training_file_names):
training_kmers = training_file_to_ksize_to_kmers[file_name][k_size]
# | train \cap query| / | train |
containment_indicies[i, j] = self.__return_containment_index(training_kmers, query_kmers)
else:
# need to compute the k-mers in the query file
query_file_to_ksize_to_kmers = self._return_ksize_to_kmers(query_file)
for (j, k_size) in enumerate(k_sizes):
query_kmers = query_file_to_ksize_to_kmers[k_size]
for (i, file_name) in enumerate(training_file_names):
training_kmers = training_file_to_ksize_to_kmers[file_name][k_size]
# | train \cap query| / | train |
containment_indicies[i, j] = self.__return_containment_index(training_kmers, query_kmers)
containment_indicies = np.zeros((len(training_file_names), len(k_sizes)))

# serial version
#for (j, k_size) in enumerate(k_sizes):
# query_kmc_output = self.__kmc_output_name_converter(query_file, k_size)
# for (i, training_file) in enumerate(training_file_names):
# training_kmc_output = self.__kmc_output_name_converter(training_file, k_size)
# numerator = self._kmc_return_intersection_count(query_kmc_output, training_kmc_output)
# denomenator = self._kmc_return_distinct_kmers(f"{training_kmc_output}.log")
# containment_indicies[i, j] = numerator / float(denomenator) # | train \cap query| / | train |

# parallel version
to_compute = []
for i in range(len(training_file_names)):
for j in range(len(k_sizes)):
to_compute.append((query_file, i, j))
pool = multiprocessing.Pool(processes=int(min(multiprocessing.cpu_count(), len(self.training_file_names))))
res = pool.starmap(self._return_containment_index, to_compute)
for (i, j, ci) in res:
containment_indicies[i, j] = ci
pool.close()

return containment_indicies

def return_containment_data_frame(self, query_file: str, location_of_thresh: int, coverage_threshold: float) -> pd.DataFrame:
Expand All @@ -228,15 +242,14 @@ def return_containment_data_frame(self, query_file: str, location_of_thresh: int
"""
k_range = self.k_sizes
training_file_names = self.training_file_names
containment_indices = self.__return_containment_indicies(query_file)
containment_indices = self._return_containment_indicies(query_file)
df = Query.return_data_frame(training_file_names=training_file_names,
k_range=k_range,
location_of_thresh=location_of_thresh,
containment_indices=containment_indices,
coverage_threshold=coverage_threshold)
return df


def main():
parser = argparse.ArgumentParser(
description="This script calculates the *ground truth* containment indicies for each of the training/reference sketches"
Expand Down Expand Up @@ -264,9 +277,9 @@ def main():
out_file = args.out_file
location_of_thresh = args.location_of_thresh
coverage_threshold = args.containment_threshold

temp_dir = tempfile.TemporaryDirectory()
# pre-compute the kmers in the training database
g = TrueContainment(training_database_file=training_database_file, k_sizes=k_sizes)
g = TrueContainment(training_database_file=training_database_file, k_sizes=k_sizes, temp_dir=temp_dir.name)

# compute the containment indicies
df = g.return_containment_data_frame(query_file=query_file, location_of_thresh=location_of_thresh, coverage_threshold=coverage_threshold)
Expand Down
Loading

0 comments on commit 7fc2e86

Please sign in to comment.