Skip to content

Commit

Permalink
Merge branch 'main' into squidpy
Browse files Browse the repository at this point in the history
  • Loading branch information
CarlinLiao committed Jul 28, 2023
2 parents dfcb152 + 9a57ec8 commit a13eb87
Show file tree
Hide file tree
Showing 13 changed files with 379 additions and 273 deletions.
9 changes: 4 additions & 5 deletions spatialprofilingtoolbox/cggnn/scripts/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"Run through the entire SPT CG-GNN pipeline using a local db config."
from argparse import ArgumentParser
from typing import Dict, Tuple
from os.path import join

from pandas import DataFrame
Expand Down Expand Up @@ -121,7 +120,7 @@ def parse_arguments():
return parser.parse_args()


def _create_cell_df(cell_dfs: Dict[str, DataFrame], feature_names: Dict[str, str]) -> DataFrame:
def _create_cell_df(cell_dfs: dict[str, DataFrame], feature_names: dict[str, str]) -> DataFrame:
"Find chemical species, phenotypes, and locations and merge into a DataFrame."

for specimen, df_specimen in cell_dfs.items():
Expand All @@ -140,7 +139,7 @@ def _create_cell_df(cell_dfs: Dict[str, DataFrame], feature_names: Dict[str, str


def _create_label_df(df_assignments: DataFrame,
df_strata: DataFrame) -> Tuple[DataFrame, Dict[int, str]]:
df_strata: DataFrame) -> tuple[DataFrame, dict[int, str]]:
"""Get slide-level results."""
df = merge(df_assignments, df_strata, on='stratum identifier', how='left')[
['specimen', 'subject diagnosed result']].rename(
Expand All @@ -164,8 +163,8 @@ def save_importances(_args):

if __name__ == "__main__":
args = parse_arguments()
study_data: Dict[str, Dict] = FeatureMatrixExtractor.extract(
args.spt_db_config_location)[args.study]
extractor = FeatureMatrixExtractor(database_config_file=args.spt_db_config_location)
study_data: dict[str, dict] = extractor.extract(study=args.study)

df_cell = _create_cell_df(
{slide: data['dataframe']
Expand Down
10 changes: 7 additions & 3 deletions spatialprofilingtoolbox/db/expressions_table_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ def ensure_indexed_expressions_table(connection):
ExpressionsTableIndexer.create_index(cursor)
connection.commit()

@staticmethod
def expressions_table_is_indexed_cursor(cursor):
columns = ExpressionsTableIndexer.get_expression_quantification_columns(cursor)
return 'source_specimen' in columns

@staticmethod
def expressions_table_is_indexed(connection):
with connection.cursor() as cursor:
columns = ExpressionsTableIndexer.get_expression_quantification_columns(cursor)
return 'source_specimen' in columns
return ExpressionsTableIndexer.expressions_table_is_indexed_cursor(cursor)

@staticmethod
def get_expression_quantification_columns(cursor):
Expand All @@ -31,7 +35,7 @@ def get_expression_quantification_columns(cursor):

@staticmethod
def create_index(cursor):
ETI = ExpressionsTableIndexer()
ETI = ExpressionsTableIndexer() #pylint: disable=invalid-name
ExpressionsTableIndexer.log_current_indexes(cursor)
logger.debug('Will create extra index column "source_specimen".')
ETI.create_extra_column(cursor)
Expand Down
186 changes: 124 additions & 62 deletions spatialprofilingtoolbox/db/feature_matrix_extractor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""
Convenience provision of a feature matrix for each study, the data retrieved
from the SPT database.
Convenience provision of a feature matrix for each study, the data retrieved from the SPT database.
"""
import sys
from enum import Enum
from enum import auto
from typing import cast

import pandas as pd
from psycopg2.extensions import cursor as Psycopg2Cursor

from spatialprofilingtoolbox.db.database_connection import DatabaseConnectionMaker
from spatialprofilingtoolbox.db.stratification_puller import StratificationPuller
Expand All @@ -16,26 +19,89 @@
logger = colorized_logger(__name__)


class DBSource(Enum):
"""Indicator of intended database source."""
CURSOR = auto()
CONFIG_FILE = auto()
UNKNOWN = auto()


class FeatureMatrixExtractor:
"""
Pull from the database and create convenience bundle of feature matrices
and metadata.
Pull from the database and create convenience bundle of feature matrices and metadata.
"""
@staticmethod
def extract(database_config_file, specimen: str=None, study: str=None, continuous_also=False):
E = FeatureMatrixExtractor
data_arrays = E.retrieve_expressions_from_database(database_config_file,
specimen=specimen,
study=study,
continuous_also=continuous_also)
centroid_coordinates = E.retrieve_structure_centroids_from_database(database_config_file,
specimen=specimen,
study=study)
stratification = E.retrieve_derivative_stratification_from_database(database_config_file)
study_component_lookup = E.retrieve_study_component_lookup(database_config_file)
merged = E.merge_dictionaries(
E.create_feature_matrices(data_arrays, centroid_coordinates),
E.create_channel_information(data_arrays),

cursor: Psycopg2Cursor
database_config_file: str | None
db_source: DBSource

def __init__(self,
cursor: Psycopg2Cursor | None=None,
database_config_file: str | None=None,
):
self.cursor = cast(Psycopg2Cursor, cursor)
self.database_config_file = database_config_file
if cursor is not None:
self.db_source = DBSource.CURSOR
elif database_config_file is not None:
self.db_source = DBSource.CONFIG_FILE
else:
self.db_source = DBSource.UNKNOWN
self._report_on_arguments()

def _report_on_arguments(self):
if self.cursor is None and self.database_config_file is None:
logger.error('Must supply either cursor or database_config_file.')
if self.cursor is not None and self.database_config_file is not None:
message = 'A cursor and database configuration file were both specified. Using the '\
'cursor.'
logger.warning(message)

def extract(self,
specimen: str | None=None,
study: str | None=None,
continuous_also: bool=False,
):
extraction = None
match self.db_source:
case DBSource.CURSOR:
extraction = self._extract(
specimen=specimen,
study=study,
continuous_also=continuous_also,
)
case DBSource.CONFIG_FILE:
with DatabaseConnectionMaker(self.database_config_file) as dcm:
with dcm.get_connection().cursor() as cursor:
self.cursor = cursor
extraction = self._extract(
specimen=specimen,
study=study,
continuous_also=continuous_also,
)
case DBSource.UNKNOWN:
logger.error('The database source can not be determined.')
return extraction

def _extract(self,
specimen: str | None=None,
study: str | None=None,
continuous_also: bool=False,
):
data_arrays = self._retrieve_expressions_from_database(
specimen=specimen,
study=study,
continuous_also=continuous_also,
)
centroid_coordinates = self._retrieve_structure_centroids_from_database(
specimen=specimen,
study=study,
)
stratification = self._retrieve_derivative_stratification_from_database()
study_component_lookup = self._retrieve_study_component_lookup()
merged = self._merge_dictionaries(
self._create_feature_matrices(data_arrays, centroid_coordinates),
self._create_channel_information(data_arrays),
stratification,
new_keys=['feature matrices','channel symbols by column name', 'sample cohorts'],
study_component_lookup=study_component_lookup,
Expand All @@ -57,52 +123,47 @@ def redact_dataframes(extraction):
extraction[study_name]['sample cohorts']['assignments'] = None
extraction[study_name]['sample cohorts']['strata'] = None

@staticmethod
def retrieve_expressions_from_database(database_config_file, specimen: str=None,
study: str=None, continuous_also=False):
def _retrieve_expressions_from_database(self,
specimen: str | None=None,
study: str | None=None,
continuous_also: bool=False,
):
logger.info('Retrieving expression data from database.')
with SparseMatrixPuller(database_config_file) as puller:
puller.pull(specimen=specimen, study=study, continuous_also=continuous_also)
data_arrays = puller.get_data_arrays()
puller = SparseMatrixPuller(self.cursor)
puller.pull(specimen=specimen, study=study, continuous_also=continuous_also)
data_arrays = puller.get_data_arrays()
logger.info('Done retrieving expression data from database.')
return data_arrays.get_studies()

@staticmethod
def retrieve_structure_centroids_from_database(database_config_file, specimen: str=None,
study: str=None):
def _retrieve_structure_centroids_from_database(self,
specimen: str | None=None,
study: str | None=None,
):
logger.info('Retrieving polygon centroids from shapefiles in database.')
with StructureCentroidsPuller(database_config_file) as puller:
puller.pull(specimen=specimen, study=study)
structure_centroids = puller.get_structure_centroids()
puller = StructureCentroidsPuller(self.cursor)
puller.pull(specimen=specimen, study=study)
structure_centroids = puller.get_structure_centroids()
logger.info('Done retrieving centroids.')
return structure_centroids.get_studies()

@staticmethod
def retrieve_derivative_stratification_from_database(database_config_file):
def _retrieve_derivative_stratification_from_database(self):
logger.info('Retrieving stratification from database.')
with StratificationPuller(database_config_file=database_config_file) as puller:
puller.pull()
stratification = puller.get_stratification()
puller = StratificationPuller(self.cursor)
puller.pull()
stratification = puller.get_stratification()
logger.info('Done retrieving stratification.')
return stratification

@staticmethod
def retrieve_study_component_lookup(database_config_file):
with DatabaseConnectionMaker(database_config_file=database_config_file) as maker:
connection = maker.get_connection()
cursor = connection.cursor()
cursor.execute('SELECT * FROM study_component ; ')
rows = cursor.fetchall()
cursor.close()
def _retrieve_study_component_lookup(self):
self.cursor.execute('SELECT * FROM study_component ; ')
rows = self.cursor.fetchall()
lookup = {}
for row in rows:
lookup[row[1]] = row[0]
return lookup

@staticmethod
def create_feature_matrices(data_arrays, centroid_coordinates):
logger.info(
'Creating feature matrices from binary data arrays and centroids.')
def _create_feature_matrices(self, data_arrays, centroid_coordinates):
logger.info('Creating feature matrices from binary data arrays and centroids.')
matrices = {}
for k, study_name in enumerate(sorted(list(data_arrays.keys()))):
study = data_arrays[study_name]
Expand All @@ -112,7 +173,7 @@ def create_feature_matrices(data_arrays, centroid_coordinates):
expressions = study['data arrays by specimen'][specimen]
number_channels = len(study['target index lookup'])
rows = [
FeatureMatrixExtractor.create_feature_matrix_row(
self._create_feature_matrix_row(
centroid_coordinates[study_name][specimen][i],
expressions[i],
number_channels,
Expand Down Expand Up @@ -144,40 +205,41 @@ def create_feature_matrices(data_arrays, centroid_coordinates):
return matrices

@staticmethod
def create_feature_matrix_row(centroid, binary, number_channels):
def _create_feature_matrix_row(centroid, binary, number_channels):
template = '{0:0%sb}' % number_channels # pylint: disable=consider-using-f-string
feature_vector = [int(value) for value in list(template.format(binary)[::-1])]
return [centroid[0], centroid[1]] + feature_vector

@staticmethod
def create_channel_information(data_arrays):
def _create_channel_information(self, data_arrays):
return {
study_name: FeatureMatrixExtractor.create_channel_information_for_study(study)
study_name: self._create_channel_information_for_study(study)
for study_name, study in data_arrays.items()
}

@staticmethod
def create_channel_information_for_study(study):
def _create_channel_information_for_study(self, study):
logger.info('Aggregating channel information for one study.')
targets = {int(index): target for target,
index in study['target index lookup'].items()}
symbols = {target: symbol for symbol,
target in study['target by symbol'].items()}
targets = {
int(index): target
for target, index in study['target index lookup'].items()
}
symbols = {
target: symbol
for symbol, target in study['target by symbol'].items()
}
logger.info('Done aggregating channel information.')
return {
f'F{i}': symbols[targets[i]]
for i in sorted([int(index) for index in targets.keys()])
}

@staticmethod
def merge_dictionaries(*args, new_keys: list, study_component_lookup: dict):
def _merge_dictionaries(self, *args, new_keys: list, study_component_lookup: dict):
if not len(args) == len(new_keys):
logger.error(
"Can not match up dictionaries to be merged with the list of key names to be "
"issued for them.")
sys.exit(1)

merged = {}
merged: dict = {}
for i in range(len(new_keys)):
for substudy, value in args[i].items():
merged[study_component_lookup[substudy]] = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
except ModuleNotFoundError as e:
SuggestExtrasException(e, 'db')

bundle = FeatureMatrixExtractor.extract(database_config_file)
extractor = FeatureMatrixExtractor(database_config_file=database_config_file)
bundle: dict = extractor.extract()

for study_name, study in bundle.items():
for specimen, specimen_data in study['feature matrices']:
Expand Down
Loading

0 comments on commit a13eb87

Please sign in to comment.