Skip to content

Commit

Permalink
Merge pull request #199 from broadinstitute/development
Browse files Browse the repository at this point in the history
Release v1.9.0
  • Loading branch information
knapii-developments authored Jan 6, 2021
2 parents b720fc0 + d424263 commit e143d33
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 58 deletions.
6 changes: 6 additions & 0 deletions ingest/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ def determine_coordinates_and_cell_names(self):
if annot[0].lower() not in ("z", "y", "x", "name")
]

@staticmethod
def get_cell_names(df):
cell_names_str: str = df[("NAME", "TYPE")].to_string(index=False)
cell_names: list = cell_names_str.strip().splitlines()
return cell_names

def merge_df(self, first_df, second_df):
""" Does an inner join on a dataframe """
self.file = pd.merge(second_df, first_df, on=[("NAME", "TYPE")])
Expand Down
34 changes: 23 additions & 11 deletions ingest/ingest_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
from clusters import Clusters
from expression_files.mtx import MTXIngestor
from expression_files.dense_ingestor import DenseIngestor
from monitor import setup_logger, trace, log_exception
from monitor import setup_logger, log_exception

except ImportError:
# Used when importing as external package, e.g. imports in single_cell_portal code
Expand All @@ -85,7 +85,7 @@
report_issues,
write_metadata_to_bq,
)
from .monitor import setup_logger, trace, log_exception
from .monitor import setup_logger, log_exception
from .cell_metadata import CellMetadata
from .clusters import Clusters
from .expression_files.dense_ingestor import DenseIngestor
Expand Down Expand Up @@ -186,7 +186,6 @@ def initialize_file_connection(self, file_type, file_path):
def insert_many(self, collection_name, documents):
self.db[collection_name].insert_many(documents)

@trace
# @profile
def load(
self,
Expand Down Expand Up @@ -401,15 +400,28 @@ def subsample(self):
if self.cell_metadata_file is not None:
try:
subsample.prepare_cell_metadata()
for data in subsample.subsample("study"):
load_status = self.load_subsample(
Clusters.COLLECTION_NAME,
data,
subsample.set_data_array,
"study",
# Get cell names from cluster and metadata files
cluster_cell_names = SubSample.get_cell_names(subsample.file)
metadata_cell_names = SubSample.get_cell_names(
subsample.cell_metadata.file
)
# Check that cell names in cluster file exist in cell metadata file
if SubSample.has_cells_in_metadata_file(
metadata_cell_names, cluster_cell_names
):
for data in subsample.subsample("study"):
load_status = self.load_subsample(
Clusters.COLLECTION_NAME,
data,
subsample.set_data_array,
"study",
)
if load_status != 0:
return load_status
else:
raise ValueError(
"Cluster file has cell names that are not present in cell metadata file."
)
if load_status != 0:
return load_status
except Exception as e:
log_exception(IngestPipeline.dev_logger, IngestPipeline.user_logger, e)
return 1
Expand Down
14 changes: 11 additions & 3 deletions ingest/mongo_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,21 @@ def graceful_auto_reconnect(mongo_op_func):
"""Gracefully handles a reconnection event as well as other exceptions
for mongo.
"""
import random
import math

MAX_ATTEMPTS = 5
# Adopted from https://stackoverflow.com/questions/46939285

def retry(attempt_num):
if attempt_num < MAX_ATTEMPTS - 1:
wait_time = 0.5 * pow(2, attempt_num) # exponential back off
dev_logger.warning(" Waiting %.1f seconds.", wait_time)
time.sleep(wait_time)
exp_backoff = pow(2, attempt_num)
max_jitter = math.ceil(exp_backoff * 0.2)
final_wait_time = exp_backoff + random.randint(
0, max_jitter
) # exponential back off
dev_logger.warning(" Waiting %.1f seconds.", final_wait_time)
time.sleep(final_wait_time)

@functools.wraps(mongo_op_func)
def wrapper(*args, **kwargs):
Expand Down
15 changes: 10 additions & 5 deletions ingest/subsample.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


class SubSample(Annotations):
ALLOWED_FILE_TYPES = ['text/csv', 'text/plain', 'text/tab-separated-values']
ALLOWED_FILE_TYPES = ["text/csv", "text/plain", "text/tab-separated-values"]
MAX_THRESHOLD = 100_000
SUBSAMPLE_THRESHOLDS = [MAX_THRESHOLD, 20_000, 10_000, 1_000]

Expand All @@ -28,6 +28,11 @@ def __init__(self, cluster_file, cell_metadata_file=None):
cell_metadata_file, CellMetadata.ALLOWED_FILE_TYPES
)

@staticmethod
def has_cells_in_metadata_file(metadata_cells, cluster_cells):
"""Checks if cells in cluster are in metadata cells"""
return set(cluster_cells).issubset(set(metadata_cells))

def prepare_cell_metadata(self):
""" Does an inner join on cell and cluster file """
if self.cell_metadata is not None:
Expand Down Expand Up @@ -55,9 +60,9 @@ def bin(self, annotation: Tuple[str, str], scope: str):
bin = {}
# sample the annotation along with coordinates and cell names
columns_to_sample = copy.copy(self.coordinates_and_cell_headers)
if scope == 'cluster':
if scope == "cluster":
columns_to_sample.append(annotation[0])
if 'group' in annotation:
if "group" in annotation:
# get unique values in column
unique_values = self.file[annotation].unique()

Expand Down Expand Up @@ -96,7 +101,7 @@ def subsample(self, scope):
group_size = len(annotation_dict.keys())
# Dict of values for the x, y, and z coordinates
points = {k: [] for k in self.coordinates_and_cell_headers}
if scope == 'cluster':
if scope == "cluster":
points[annotation_name[0]] = []
num_per_group = int(sample_size / group_size)
cells_left = sample_size
Expand Down Expand Up @@ -137,7 +142,7 @@ def subsample(self, scope):
def return_sorted_bin(self, bin, annot_name):
"""Sorts binned groups in order of size from smallest to largest for group annotations """

if 'group' in annot_name:
if "group" in annot_name:
return sorted(bin.items(), key=lambda x: len(x[1]))
else:
return bin.items()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name="scp-ingest-pipeline",
version="1.8.7",
version="1.9.0",
description="ETL pipeline for single-cell RNA-seq data",
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
17 changes: 17 additions & 0 deletions tests/data/good_subsample_cluster.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
NAME,X,Y,Z,Average Intensity
TYPE,numeric,numeric,numeric,numeric
CELL_0001,34.472,32.211,60.035,0.719
CELL_0002,15.975,10.043,21.424,0.904
CELL_0003,-11.688,-53.645,-58.374,2.195
CELL_0004,30.04,31.138,33.597,-1.084
CELL_0005,23.862,33.092,26.904,4.256
CELL_0006,-39.07,-14.64,-44.643,1.317
CELL_0007,40.039,27.206,55.211,-4.917
CELL_0008,28.755,27.187,34.686,-3.777
CELL_0009,-48.601,-13.512,-51.659,0.778
CELL_00010,14.653,27.832,28.586,4.62
CELL_00011,20.603,32.071,45.484,-3.019
CELL_00012,-10.333,-51.733,-26.631,-4.989
CELL_00013,-52.966,-12.484,-60.369,3.137
CELL_00014,38.513,26.969,63.654,-1.74
CELL_00015,12.838,13.047,17.685,-2.443
7 changes: 6 additions & 1 deletion tests/data/metadata_example.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ CELL_00011 CLST_C CLST_C_1 0.638
CELL_00012 CLST_C CLST_C_1 8.888
CELL_00013 CLST_C CLST_C_1 -2.27
CELL_00014 CLST_C CLST_C_2 -2.606
CELL_00015 CLST_C CLST_C_2 -9.089
CELL_00015 CLST_C CLST_C_2 -9.089
CELL_00016 CLST_C CLST_C_1 0.638
CELL_00017 CLST_C CLST_C_1 8.888
CELL_00018 CLST_C CLST_C_1 -2.27
CELL_00019 CLST_C CLST_C_2 -2.606
CELL_00020 CLST_C CLST_C_2 -9.089
101 changes: 70 additions & 31 deletions tests/test_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,27 @@


class TestAnnotations(unittest.TestCase):
CLUSTER_PATH = '../tests/data/test_1k_cluster_data.csv'
CELL_METADATA_PATH = '../tests/data/valid_no_array_v2.0.0.txt'
CLUSTER_PATH = "../tests/data/test_1k_cluster_data.csv"
CELL_METADATA_PATH = "../tests/data/valid_no_array_v2.0.0.txt"

EXPONENT = -3

def setUp(self):
self.df = Annotations(
self.CLUSTER_PATH, ['text/csv', 'text/plain', 'text/tab-separated-values']
self.CLUSTER_PATH, ["text/csv", "text/plain", "text/tab-separated-values"]
)

def test_duplicate_headers(self):
"""Annotation headers should not contain duplicate values
"""
dup_headers = Annotations(
'../tests/data/dup_headers_v2.0.0.tsv',
['text/csv', 'text/plain', 'text/tab-separated-values'],
"../tests/data/dup_headers_v2.0.0.tsv",
["text/csv", "text/plain", "text/tab-separated-values"],
)

self.assertFalse(
dup_headers.validate_unique_header(),
'Duplicate headers should fail format validation',
"Duplicate headers should fail format validation",
)

with self.assertRaises(ValueError):
Expand All @@ -60,51 +60,51 @@ def test_header_format(self):
"""Header rows of metadata file should conform to standard
"""
error_headers = Annotations(
'../tests/data/error_headers_v2.0.0.tsv',
['text/csv', 'text/plain', 'text/tab-separated-values'],
"../tests/data/error_headers_v2.0.0.tsv",
["text/csv", "text/plain", "text/tab-separated-values"],
)

self.assertFalse(
error_headers.validate_header_keyword(),
'Missing NAME keyword should fail format validation',
"Missing NAME keyword should fail format validation",
)

self.assertFalse(
error_headers.validate_type_keyword(),
'Missing TYPE keyword should fail format validation',
"Missing TYPE keyword should fail format validation",
)

self.assertFalse(
error_headers.validate_type_annotations(),
'Invalid type annotations should fail format validation',
"Invalid type annotations should fail format validation",
)

def test_low_mem_artifact(self):
# pandas default of low_memory=True allows internal chunking during parsing
# causing inconsistent dtype coercion artifact for larger annotation files

lmtest = Annotations(
'../tests/data/low_mem_unit.txt',
['text/csv', 'text/plain', 'text/tab-separated-values'],
"../tests/data/low_mem_unit.txt",
["text/csv", "text/plain", "text/tab-separated-values"],
)
lmtest.preprocess()

# when low memory=True, the first row in the file would be in the first chunk
# and the numeric value was not properly coerced to become a string
assert isinstance(
lmtest.file['mixed_data']['group'][0], str
lmtest.file["mixed_data"]["group"][0], str
), "numeric value should be coerced to string"

# Per SCP-2545 NA values become strings for group annotations.
print(lmtest.file['mixed_data']['group'][2])
print(type(lmtest.file['mixed_data']['group'][2]))
print(lmtest.file["mixed_data"]["group"][2])
print(type(lmtest.file["mixed_data"]["group"][2]))
assert isinstance(
lmtest.file['mixed_data']['group'][2], str
lmtest.file["mixed_data"]["group"][2], str
), "expect empty cell conversion to NaN is string for group annotation"

# numeric value in second chunk should still properly be coerced to string type
assert isinstance(
lmtest.file['mixed_data']['group'][32800], str
lmtest.file["mixed_data"]["group"][32800], str
), "numeric value should be coerced to string"

def test_round(self):
Expand All @@ -113,7 +113,7 @@ def test_round(self):
self.df.preprocess()
for column in self.df.file.columns:
annot_type = column[1]
if annot_type == 'numeric':
if annot_type == "numeric":
value = str(self.df.file[column][ran_num])
print(Decimal(value).as_tuple().exponent)
assert (
Expand All @@ -127,7 +127,7 @@ def test_group_annotations(self):
header = column[0]
assert isinstance(header, str)
annot_type = column[1]
if annot_type == 'group':
if annot_type == "group":
# corrected testings of dataframe column dtype, using != always returns True
self.assertFalse(
np.issubdtype(self.df.file[column].dtypes, np.number),
Expand All @@ -136,33 +136,72 @@ def test_group_annotations(self):

def test_merge_df(self):
cluster = Clusters(
'../tests/data/test_1k_cluster_data.csv',
'dec0dedfeed1111111111111',
'addedfeed000000000000000',
'testCluster',
"../tests/data/test_1k_cluster_data.csv",
"dec0dedfeed1111111111111",
"addedfeed000000000000000",
"testCluster",
)
cell_metadata_df = Annotations(
self.CELL_METADATA_PATH,
['text/csv', 'text/plain', 'text/tab-separated-values'],
["text/csv", "text/plain", "text/tab-separated-values"],
)
cell_metadata_df.preprocess()
cell_names_cell_metadata_df = np.asarray(cell_metadata_df.file['NAME'])
cell_names_cluster_df = np.asarray(cluster.file['NAME'])
cell_names_cell_metadata_df = np.asarray(cell_metadata_df.file["NAME"])
cell_names_cluster_df = np.asarray(cluster.file["NAME"])
# Cell names found in both cluster and metadata files
common_cell_names = cell_names_cluster_df[
np.isin(cell_names_cluster_df, cell_names_cell_metadata_df)
]
print(f'common cell names: {common_cell_names}')
print(f"common cell names: {common_cell_names}")
# Perform merge
print(cluster.file[['NAME', 'x', 'y', 'z']])
cluster.merge_df(cluster.file[['NAME', 'x', 'y', 'z']], cell_metadata_df.file)
print(cluster.file[["NAME", "x", "y", "z"]])
cluster.merge_df(cluster.file[["NAME", "x", "y", "z"]], cell_metadata_df.file)

# Ensure ONLY common cell names found in cell metadata file and cluster file
# are in the newly merged df
result = all(
cell[0] in common_cell_names for cell in cluster.file['NAME'].values
cell[0] in common_cell_names for cell in cluster.file["NAME"].values
)
self.assertTrue(
result,
f"Merge was not performed correctly. Merge should be performed on 'NAME'",
)

def test_get_cell_names(self):
import pandas as pd

expected_cell_names = [
"CELL_0001",
" CELL_0002",
" CELL_0003",
" CELL_0004",
" CELL_0005",
" CELL_0006",
" CELL_0007",
" CELL_0008",
" CELL_0009",
" CELL_00010",
" CELL_00011",
" CELL_00012",
" CELL_00013",
" CELL_00014",
" CELL_00015",
" CELL_00016",
" CELL_00017",
" CELL_00018",
" CELL_00019",
" CELL_00020",
]
column_names = [
("NAME", "TYPE"),
("Cluster", "group"),
("Sub-Cluster", "group"),
("Average Intensity", "numeric"),
]
index = pd.MultiIndex.from_tuples(column_names)

df = pd.read_csv(
"../tests/data/metadata_example.txt", sep="\t", names=index, skiprows=2
)
cells = Annotations.get_cell_names(df)
self.assertEqual(cells, expected_cell_names)
Loading

0 comments on commit e143d33

Please sign in to comment.