Skip to content

Commit

Permalink
added pheno, removed multistudy to FME
Browse files Browse the repository at this point in the history
  • Loading branch information
CarlinLiao committed Aug 21, 2023
1 parent 3c1827b commit bdda7d7
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 191 deletions.
4 changes: 2 additions & 2 deletions spatialprofilingtoolbox/cggnn/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from spatialprofilingtoolbox.db.feature_matrix_extractor import (
FeatureMatrixExtractor,
Bundle,
StudyBundle,
)


Expand Down Expand Up @@ -54,7 +54,7 @@ def extract_cggnn_data(
) -> tuple[DataFrame, DataFrame, dict[int, str]]:
"""Extract information cg-gnn needs from SPT."""
extractor = FeatureMatrixExtractor(database_config_file=spt_db_config_location)
study_data = cast(Bundle, extractor.extract(study=study))
study_data = cast(StudyBundle, extractor.extract(study=study))
df_cell = _create_cell_df(
{
slide: cast(DataFrame, data['dataframe'])
Expand Down
252 changes: 131 additions & 121 deletions spatialprofilingtoolbox/db/feature_matrix_extractor.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
"""
Convenience provision of a feature matrix for each study, the data retrieved from the SPT database.
"""
"""Convenience provision of a feature matrix for each study, retrieved from the SPT database."""

from enum import Enum
from enum import auto
from typing import cast
from typing import cast, Any

from pandas import DataFrame
from psycopg2.extensions import cursor as Psycopg2Cursor

from spatialprofilingtoolbox import DatabaseConnectionMaker
from spatialprofilingtoolbox.db.exchange_data_formats.metrics import PhenotypeCriteria
from spatialprofilingtoolbox.db.phenotypes import PhenotypesAccess
from spatialprofilingtoolbox.db.stratification_puller import StratificationPuller
from spatialprofilingtoolbox.db.study_access import StudyAccess
from spatialprofilingtoolbox.workflow.common.structure_centroids_puller import \
StructureCentroidsPuller
from spatialprofilingtoolbox.workflow.common.sparse_matrix_puller import SparseMatrixPuller
from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger

logger = colorized_logger(__name__)

BundlePart = dict[str, DataFrame | str | dict[str, DataFrame | str ]]
Bundle = dict[str, dict[str, BundlePart]]
StudyBundle = dict[str, dict[str, Any]]


class DBSource(Enum):
"""Indicator of intended database source."""
Expand All @@ -35,9 +36,9 @@ class FeatureMatrixExtractor:
db_source: DBSource

def __init__(self,
cursor: Psycopg2Cursor | None=None,
database_config_file: str | None=None,
):
cursor: Psycopg2Cursor | None = None,
database_config_file: str | None = None,
) -> None:
self.cursor = cast(Psycopg2Cursor, cursor)
self.database_config_file = database_config_file
if cursor is not None:
Expand All @@ -57,10 +58,10 @@ def _report_on_arguments(self):
logger.warning(message)

def extract(self,
specimen: str | None=None,
study: str | None=None,
continuous_also: bool=False,
) -> Bundle | None:
specimen: str | None = None,
study: str | None = None,
continuous_also: bool = False,
) -> StudyBundle:
extraction = None
match self.db_source:
case DBSource.CURSOR:
Expand All @@ -83,10 +84,12 @@ def extract(self,
return extraction

def _extract(self,
specimen: str | None=None,
study: str | None=None,
continuous_also: bool=False,
) -> Bundle | None:
specimen: str | None = None,
study: str | None = None,
continuous_also: bool = False,
) -> StudyBundle:
if (specimen is None) != (study is None):
raise ValueError('Must specify exactly one of specimen or study.')
data_arrays = self._retrieve_expressions_from_database(
specimen=specimen,
study=study,
Expand All @@ -96,163 +99,170 @@ def _extract(self,
specimen=specimen,
study=study,
)
stratification = self._retrieve_derivative_stratification_from_database()
study_component_lookup = self._retrieve_study_component_lookup()
merged = self._merge_dictionaries(
self._create_feature_matrices(data_arrays, centroid_coordinates),
self._create_channel_information(data_arrays),
stratification,
new_keys=['feature matrices','channel symbols by column name', 'sample cohorts'],
study_component_lookup=study_component_lookup,
)
if merged is None:
return None
if study is not None:
for key in list(merged.keys()):
if not key == study:
del merged[key]
return merged
if study is None:
assert specimen is not None
study = StudyAccess(self.cursor).get_study_from_specimen(specimen)
channel_information = self._create_channel_information(data_arrays[study])
phenotypes, phenotype_information = self._retrieve_phenotypes(study, channel_information)
return {
'feature matrices': self._create_feature_matrices(
data_arrays[study],
centroid_coordinates[study],
phenotypes
),
'channel symbols by column name': channel_information,
'sample cohorts': self._retrieve_derivative_stratification_from_database()[study],
'phenotypes by column name': phenotype_information
}

@staticmethod
def redact_dataframes(extraction):
for study_name, study in extraction.items():
for specimen in study['feature matrices'].keys():
extraction[study_name]['feature matrices'][specimen]['dataframe'] = None
key = 'continuous dataframe'
if key in extraction[study_name]['feature matrices'][specimen]:
extraction[study_name]['feature matrices'][specimen][key] = None
extraction[study_name]['sample cohorts']['assignments'] = None
extraction[study_name]['sample cohorts']['strata'] = None
def redact_dataframes(study: dict[str, dict[str, Any]]) -> None:
for specimen in study['feature matrices'].keys():
study['feature matrices'][specimen]['dataframe'] = None
key = 'continuous dataframe'
if key in study['feature matrices'][specimen]:
study['feature matrices'][specimen][key] = None
study['sample cohorts']['assignments'] = None
study['sample cohorts']['strata'] = None

def _retrieve_expressions_from_database(self,
specimen: str | None=None,
study: str | None=None,
continuous_also: bool=False,
):
specimen: str | None = None,
study: str | None = None,
continuous_also: bool = False,
) -> dict[str, dict[str, Any]]:
logger.info('Retrieving expression data from database.')
puller = SparseMatrixPuller(self.cursor)
puller.pull(specimen=specimen, study=study, continuous_also=continuous_also)
data_arrays = puller.get_data_arrays()
logger.info('Done retrieving expression data from database.')
return data_arrays.get_studies()
return list(data_arrays.get_studies().values())[0]

def _retrieve_structure_centroids_from_database(self,
specimen: str | None=None,
study: str | None=None,
):
specimen: str | None = None,
study: str | None = None,
) -> dict[str, Any]:
logger.info('Retrieving polygon centroids from shapefiles in database.')
puller = StructureCentroidsPuller(self.cursor)
puller.pull(specimen=specimen, study=study)
structure_centroids = puller.get_structure_centroids()
logger.info('Done retrieving centroids.')
return structure_centroids.get_studies()
return list(structure_centroids.get_studies().values())[0]

def _retrieve_phenotypes(self,
study_name: str,
channel_names: dict[str, str]
) -> tuple[dict[str, PhenotypeCriteria], dict[str, str]]:
logger.info('Retrieving phenotypes from database.')
channel_to_column_name = {n: c for c, n in channel_names.items()}
phenotypes: dict[str, PhenotypeCriteria] = {}
phenotype_access = PhenotypesAccess(self.cursor)
phenotype_information: dict[str, str] = {}
for i, symbol_data in enumerate(phenotype_access.get_phenotype_symbols(study_name)):
column_name = f'P{i}'
symbol = symbol_data.handle_string
phenotypes[column_name] = phenotype_access.get_phenotype_criteria(study_name, symbol)
phenotype_information[column_name] = symbol

def _retrieve_derivative_stratification_from_database(self):
# Convert marker lists to FeatureMatrixExtractor column names.
phenotypes[column_name].positive_markers = [
channel_to_column_name[marker]
for marker in phenotypes[column_name].positive_markers
]
phenotypes[column_name].negative_markers = [
channel_to_column_name[marker]
for marker in phenotypes[column_name].negative_markers
]
logger.info('Done retrieving phenotypes.')
return phenotypes, phenotype_information

def _retrieve_derivative_stratification_from_database(self) -> dict[str, dict[str, Any]]:
logger.info('Retrieving stratification from database.')
puller = StratificationPuller(self.cursor)
puller.pull()
stratification = puller.get_stratification()
logger.info('Done retrieving stratification.')
return stratification

def _retrieve_study_component_lookup(self):
def _retrieve_study_component_lookup(self) -> dict[str, str]:
self.cursor.execute('SELECT * FROM study_component ; ')
rows = self.cursor.fetchall()
lookup = {}
lookup: dict[str, str] = {}
for row in rows:
lookup[row[1]] = row[0]
return lookup

def _create_feature_matrices(self, data_arrays, centroid_coordinates):
def _create_feature_matrices(self,
study: dict[str, dict[str, Any]],
centroid_coordinates: dict[str, Any],
phenotypes: dict[str, PhenotypeCriteria],
) -> dict[str, DataFrame]:
logger.info('Creating feature matrices from binary data arrays and centroids.')
matrices = {}
for k, study_name in enumerate(sorted(list(data_arrays.keys()))):
study = data_arrays[study_name]
matrices[study_name] = {}
for j, specimen in enumerate(sorted(list(study['data arrays by specimen'].keys()))):
for j, specimen in enumerate(sorted(list(study['data arrays by specimen'].keys()))):
logger.debug('Specimen %s .', specimen)
expressions = study['data arrays by specimen'][specimen]
number_channels = len(study['target index lookup'])
rows = [
self._create_feature_matrix_row(
centroid_coordinates[specimen][i],
expressions[i],
number_channels,
) for i in range(len(expressions))
]
dataframe = DataFrame(
rows,
columns=['pixel x', 'pixel y'] + [f'F{i}' for i in range(number_channels)],
)
for column_name, criteria in phenotypes.items():
dataframe[column_name] = (dataframe[criteria.positive_markers].all(axis=1) & \
dataframe[criteria.negative_markers].all(axis=1)).astype(int)
matrices[specimen] = {
'dataframe': dataframe,
'filename': f'{j}.tsv',
}

if 'continuous data arrays by specimen' in study:
specimens = list(study['continuous data arrays by specimen'].keys())
for j, specimen in enumerate(sorted(specimens)):
logger.debug('Specimen %s .', specimen)
expressions = study['data arrays by specimen'][specimen]
expression_vectors = study['continuous data arrays by specimen'][specimen]
number_channels = len(study['target index lookup'])
rows = [
self._create_feature_matrix_row(
centroid_coordinates[study_name][specimen][i],
expressions[i],
number_channels,
)
for i in range(len(expressions))
]
dataframe = DataFrame(
rows,
columns=['pixel x', 'pixel y'] + [f'F{i}' for i in range(number_channels)],
expression_vectors,
columns=[f'F{i}' for i in range(number_channels)],
)
matrices[study_name][specimen] = {
'dataframe': dataframe,
'filename': f'{k}.{j}.tsv',
}

if 'continuous data arrays by specimen' in study:
specimens = list(study['continuous data arrays by specimen'].keys())
for j, specimen in enumerate(sorted(specimens)):
logger.debug('Specimen %s .', specimen)
expression_vectors = study['continuous data arrays by specimen'][specimen]
number_channels = len(study['target index lookup'])
dataframe = DataFrame(
expression_vectors,
columns=[f'F{i}' for i in range(number_channels)],
)
matrices[study_name][specimen]['continuous dataframe'] = dataframe
for column_name, criteria in phenotypes.items():
dataframe[column_name] = (dataframe[criteria.positive_markers].all(axis=1) & \
dataframe[criteria.negative_markers].all(axis=1)).astype(int)
matrices[specimen]['continuous dataframe'] = dataframe

logger.info('Done creating feature matrices.')
return matrices

@staticmethod
def _create_feature_matrix_row(centroid, binary, number_channels):
def _create_feature_matrix_row(
centroid: tuple[float, float],
binary: list[str],
number_channels: int,
) -> list[float | int]:
template = '{0:0%sb}' % number_channels # pylint: disable=consider-using-f-string
feature_vector = [int(value) for value in list(template.format(binary)[::-1])]
feature_vector: list[int] = [int(value) for value in list(template.format(binary)[::-1])]
return [centroid[0], centroid[1]] + feature_vector

def _create_channel_information(self, data_arrays):
return {
study_name: self._create_channel_information_for_study(study)
for study_name, study in data_arrays.items()
}

def _create_channel_information_for_study(self, study):
def _create_channel_information(self,
study_information: dict[str, dict[str, Any]]
) -> dict[str, str]:
logger.info('Aggregating channel information for one study.')
targets = {
int(index): target
for target, index in study['target index lookup'].items()
for target, index in study_information['target index lookup'].items()
}
symbols = {
target: symbol
for symbol, target in study['target by symbol'].items()
for symbol, target in study_information['target by symbol'].items()
}
logger.info('Done aggregating channel information.')
return {
f'F{i}': symbols[targets[i]]
for i in sorted([int(index) for index in targets.keys()])
}

def _merge_dictionaries(self,
*args,
new_keys: list,
study_component_lookup: dict
) -> Bundle | None:
if not len(args) == len(new_keys):
logger.error(
"Can not match up dictionaries to be merged with the list of key names to be "
"issued for them."
)
return None

merged: dict = {}
for i in range(len(new_keys)):
for substudy, value in args[i].items():
merged[study_component_lookup[substudy]] = {}

for i, key in enumerate(new_keys):
for substudy, value in args[i].items():
merged[study_component_lookup[substudy]][key] = value

logger.info('Done merging into a single dictionary bundle.')
return merged
Loading

0 comments on commit bdda7d7

Please sign in to comment.