Skip to content

Commit

Permalink
Merge pull request #190 from broadinstitute/development
Browse files Browse the repository at this point in the history
Release v1.8.4
  • Loading branch information
knapii-developments authored Dec 14, 2020
2 parents 3f907bf + de9109e commit df7f8d0
Show file tree
Hide file tree
Showing 15 changed files with 16,127 additions and 153 deletions.
52 changes: 28 additions & 24 deletions ingest/expression_files/dense_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,31 +241,35 @@ def transform(self):
):

data_arrays.append(all_cell_model)
# Represents row as a list
for row in self.csv_file_handler:
valid_expression_scores, exp_cells = DenseIngestor.filter_expression_scores(
row[1:], self.header
)
exp_scores = DenseIngestor.process_row(valid_expression_scores)
gene = row[0]
GeneExpression.dev_logger.debug(f"Processing {gene}")
if gene in self.gene_names:
raise ValueError(f"Duplicate gene: {gene}")
self.gene_names[gene] = True

data_arrays, gene_models, num_processed = self.create_models(
exp_cells,
exp_scores,
gene,
None,
gene_models,
data_arrays,
num_processed,
False,
)
# Expression values of raw counts are not stored. However, cell names are.
if not GeneExpression.is_raw_count_file(
self.study_id, self.study_file_id, self.mongo_connection._client
):
# Represents row as a list
for row in self.csv_file_handler:
(
valid_expression_scores,
exp_cells,
) = DenseIngestor.filter_expression_scores(row[1:], self.header)
exp_scores = DenseIngestor.process_row(valid_expression_scores)
gene = row[0]
if gene in self.gene_names:
raise ValueError(f"Duplicate gene: {gene}")
self.gene_names[gene] = True

data_arrays, gene_models, num_processed = self.create_models(
exp_cells,
exp_scores,
gene,
None,
gene_models,
data_arrays,
num_processed,
False,
)
# Load any remaining models. This is necessary because the amount of
# models maybe less than the batch size.
if len(gene_models) > 0:
# models may be less than the batch size.
if len(gene_models) > 0 or len(data_arrays) > 0:
self.create_models(
[], [], None, None, gene_models, data_arrays, num_processed, True
)
19 changes: 11 additions & 8 deletions ingest/expression_files/expression_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,17 @@ def create_gene_model(
)

@staticmethod
def is_raw_count(study_id, study_file_id, client):
def is_raw_count_file(study_id, study_file_id, client):
"Checks if study file is a raw count matrix"
COLLECTION_NAME = "study_files"
QUERY = {"_id": study_file_id, "study_id": study_id}

study_file_doc = list(client[COLLECTION_NAME].find(QUERY)).pop()
# Name of embedded document that holds 'is_raw_counts is named expression_file_info.
# Name of embedded document that holds 'is_raw_count_files is named expression_file_info.
# If study files does not have document expression_file_info
# field, "is_raw_counts", will not exist.:
# field, "is_raw_count_files", will not exist.:
if "expression_file_info" in study_file_doc.keys():
return study_file_doc["expression_file_info"]["is_raw_counts"]
return study_file_doc["expression_file_info"]["is_raw_count_files"]
else:
return False

Expand All @@ -126,6 +126,8 @@ def query_cells(study_id, client, query_kwargs):

@staticmethod
def get_cell_names_from_study_file_id(study_id, study_file_id, client):
"""Returns cell names of study files of the same type. However, the cell names
from study file id that's passed into the function are not included."""
additional_query_kwargs = {}
study_files_ids = GeneExpression.get_study_expression_file_ids(
study_id, study_file_id, client
Expand Down Expand Up @@ -206,11 +208,13 @@ def get_study_expression_file_ids(
"file_type": {"$in": ["Expression Matrix", "MM Coordinate Matrix"]},
"$nor": [{"_id": current_study_file_id}],
}
is_raw_counts = GeneExpression.is_raw_count(
is_raw_count_files = GeneExpression.is_raw_count_file(
study_id, current_study_file_id, client
)
if is_raw_counts:
QUERY["$and"].append({"expression_file_info.is_raw_counts": is_raw_counts})
if is_raw_count_files:
QUERY["$and"].append(
{"expression_file_info.is_raw_count_files": is_raw_count_files}
)
# Returned fields query results
query_results = list(client[COLLECTION_NAME].find(QUERY, field_names))
return query_results
Expand Down Expand Up @@ -282,7 +286,6 @@ def create_models(
start_time = datetime.datetime.now()
model_id = ObjectId()

GeneExpression.dev_logger.debug(f"Creating models for {gene}")
if gene:
gene_models.append(
GeneExpression.create_gene_model(
Expand Down
120 changes: 66 additions & 54 deletions ingest/expression_files/mtx.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,14 +294,22 @@ def execute_ingest(self):
self.mongo_connection._client,
),
)
# Need fresh mtx file handler for get_data_start_line_number()
fresh_mtx_file_handler = self.resolve_path(self.mtx_path)[0]
if not MTXIngestor.is_sorted(self.mtx_path, fresh_mtx_file_handler):
new_mtx_file_path = MTXIngestor.sort_mtx(
self.mtx_path, fresh_mtx_file_handler
)
# Reset mtx variables to newly sorted file
self.mtx_file, self.mtx_path = self.resolve_path(new_mtx_file_path)
if not GeneExpression.is_raw_count_file(
self.study_id, self.study_file_id, self.mongo_connection._client
):
self.is_raw_count = False
# Need fresh mtx file handler for get_data_start_line_number()
fresh_mtx_file_handler = self.resolve_path(self.mtx_path)[0]
if not MTXIngestor.is_sorted(self.mtx_path, fresh_mtx_file_handler):
new_mtx_file_path = MTXIngestor.sort_mtx(
self.mtx_path, fresh_mtx_file_handler
)
# Reset mtx variables to newly sorted file
self.mtx_file, self.mtx_path = self.resolve_path(new_mtx_file_path)
else:
# Cell names are the only data stored for raw counts.
# Therefore, no need to sort files.
self.is_raw_count = True
self.transform()

def extract_feature_barcode_matrices(self):
Expand All @@ -326,6 +334,8 @@ def transform(self):
exp_cells = []
exp_scores = []
visited_expression_indices = {}
current_gene = None
current_gene_id = None

# All observed cells
for data_array in GeneExpression.create_data_arrays(
Expand All @@ -337,60 +347,62 @@ def transform(self):
**self.data_array_kwargs,
):
data_arrays.append(data_array)
for row in self.mtx_file:
raw_gene_idx, raw_barcode_idx, raw_exp_score = row.split()
current_idx = int(raw_gene_idx)
if current_idx != prev_idx:
if not current_idx > prev_idx:
raise ValueError("MTX file must be sorted")
GeneExpression.dev_logger.debug(
f"Processing {self.genes[prev_idx - 1]}"
)
visited_expression_indices[current_idx] = True
if prev_idx != 0:
# Expressed cells and scores are associated with prior gene
prev_gene_id, prev_gene = MTXIngestor.get_features(
self.genes[prev_idx - 1]
# Create models for non-raw count files
if not self.is_raw_count:
for row in self.mtx_file:
raw_gene_idx, raw_barcode_idx, raw_exp_score = row.split()
current_idx = int(raw_gene_idx)
if current_idx != prev_idx:
if not current_idx > prev_idx:
raise ValueError("MTX file must be sorted")
GeneExpression.dev_logger.debug(
f"Processing {self.genes[prev_idx - 1]}"
)
# If the previous gene exists, load its models
visited_expression_indices[current_idx] = True
if prev_idx != 0:
# Expressed cells and scores are associated with prior gene
prev_gene_id, prev_gene = MTXIngestor.get_features(
self.genes[prev_idx - 1]
)
# If the previous gene exists, load its models
data_arrays, gene_models, num_processed = self.create_models(
exp_cells,
exp_scores,
prev_gene,
prev_gene_id,
gene_models,
data_arrays,
num_processed,
False,
)
exp_cells = []
exp_scores = []
prev_idx = current_idx
exp_cell = self.cells[int(raw_barcode_idx) - 1]
exp_score = round(float(raw_exp_score), 3)
exp_cells.append(exp_cell)
exp_scores.append(exp_score)

# create gene entries for genes with no positive expression values
for idx, gene in enumerate(self.genes):
if not visited_expression_indices.get(idx + 1):
current_gene_id, current_gene = MTXIngestor.get_features(gene)

data_arrays, gene_models, num_processed = self.create_models(
exp_cells,
exp_scores,
prev_gene,
prev_gene_id,
[],
[],
current_gene,
current_gene_id,
gene_models,
data_arrays,
num_processed,
False,
)
exp_cells = []
exp_scores = []
prev_idx = current_idx
exp_cell = self.cells[int(raw_barcode_idx) - 1]
exp_score = round(float(raw_exp_score), 3)
exp_cells.append(exp_cell)
exp_scores.append(exp_score)

# create gene entries for genes with no positive expression values
for idx, gene in enumerate(self.genes):
if not visited_expression_indices.get(idx + 1):
current_gene_id, current_gene = MTXIngestor.get_features(gene)

data_arrays, gene_models, num_processed = self.create_models(
[],
[],
current_gene,
current_gene_id,
gene_models,
data_arrays,
num_processed,
False,
)

# Create data array for last row
current_gene_id, current_gene = MTXIngestor.get_features(
self.genes[prev_idx - 1]
)
# Create data array for last row
current_gene_id, current_gene = MTXIngestor.get_features(
self.genes[prev_idx - 1]
)
self.create_models(
exp_cells,
exp_scores,
Expand Down
28 changes: 17 additions & 11 deletions ingest/mongo_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,33 @@ def graceful_auto_reconnect(mongo_op_func):
"""Gracefully handles a reconnection event as well as other exceptions
for mongo.
"""
MAX_ATTEMPTS = 5

MAX_AUTO_RECONNECT_ATTEMPTS = 5
def retry(attempt_num):
if attempt_num < MAX_ATTEMPTS - 1:
wait_time = 0.5 * pow(2, attempt_num) # exponential back off
logging.warning(" Waiting %.1f seconds.", wait_time)
time.sleep(wait_time)

@functools.wraps(mongo_op_func)
def wrapper(*args, **kwargs):
for attempt in range(MAX_AUTO_RECONNECT_ATTEMPTS):
for attempt in range(MAX_ATTEMPTS):
try:
return mongo_op_func(*args, **kwargs)
except AutoReconnect as e:
if attempt < MAX_AUTO_RECONNECT_ATTEMPTS - 1:
wait_time = 0.5 * pow(2, attempt) # exponential back off
logging.warning(
"PyMongo auto-reconnecting... %s. Waiting %.1f seconds.",
str(e),
wait_time,
)
time.sleep(wait_time)
if attempt < MAX_ATTEMPTS - 1:
logging.warning("PyMongo auto-reconnecting... %s.", str(e))
retry(attempt)
else:
raise e
except BulkWriteError as bwe:
raise BulkWriteError(bwe.details)
if attempt < MAX_ATTEMPTS - 1:
logging.warning(
"Batch ops error occurred. Reinsert attempt %s.", str(attempt)
)
retry(attempt)
else:
raise BulkWriteError(bwe.details)
except Exception as e:
raise e

Expand Down
13 changes: 13 additions & 0 deletions ingest/validation/validate_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ def retrieve_ontology_term_label(self, term, property_name, convention):
def retrieve_ontology_term_label_remote(self, term, property_name, ontology_urls):
"""Look up official ontology term from an id, always checking against the remote
"""
# organ_region currently uses single a non-OLS ontology, the Allen Institute's Mouse Brain Atlas (MBA)
# MBA was originally downloaded April 2020 from Allen Brain Atlas data portal URL
# http://api.brain-map.org/api/v2/data/query.csv?criteria=model::Structure,rma::criteria,[ontology_id$eq1],rma::options[order$eq%27structures.graph_order%27][num_rows$eqall]
# Mouse Brain Atlas IDs are numeric, to make the IDs easily attributable to their ontology
# SCP study owners are expected to prepend "MBA" and pad IDs shorter than 9 digits with leading zeros
# the expected formatting brings the MBA IDs into a format similar to other ontologies
# and avoids potential name collisions
if property_name == "organ_region":
return self.retrieve_mouse_brain_term(term, property_name)
else:
Expand Down Expand Up @@ -265,6 +272,12 @@ def retrieve_ols_term(self, ontology_urls, term, property_name):
raise RuntimeError(error_msg)

def retrieve_mouse_brain_term(self, term, property_name):
"""Determine whether ID is in mouse brain atlas (MBA) file
"""
# MBA ID is also the leaf entity of structure_id_path in the MBA file
# Entries with short structure_id_path seem to be synonymous with
# Uberon terms, suggesting MBA terms could be mapped as extensions of the
# Uberon ontology in future
mouse_brain_atlas = self.fetch_allen_mouse_brain_atlas()
MBA_id = parse_organ_region_ontology_id(term)
if MBA_id not in mouse_brain_atlas:
Expand Down
Loading

0 comments on commit df7f8d0

Please sign in to comment.