Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize database config file vs cursor etc #184

Merged
merged 15 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions spatialprofilingtoolbox/cggnn/scripts/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"Run through the entire SPT CG-GNN pipeline using a local db config."
from argparse import ArgumentParser
from typing import Dict, Tuple
from os.path import join

from pandas import DataFrame
Expand Down Expand Up @@ -121,7 +120,7 @@ def parse_arguments():
return parser.parse_args()


def _create_cell_df(cell_dfs: Dict[str, DataFrame], feature_names: Dict[str, str]) -> DataFrame:
def _create_cell_df(cell_dfs: dict[str, DataFrame], feature_names: dict[str, str]) -> DataFrame:
"Find chemical species, phenotypes, and locations and merge into a DataFrame."

for specimen, df_specimen in cell_dfs.items():
Expand All @@ -140,7 +139,7 @@ def _create_cell_df(cell_dfs: Dict[str, DataFrame], feature_names: Dict[str, str


def _create_label_df(df_assignments: DataFrame,
df_strata: DataFrame) -> Tuple[DataFrame, Dict[int, str]]:
df_strata: DataFrame) -> tuple[DataFrame, dict[int, str]]:
"""Get slide-level results."""
df = merge(df_assignments, df_strata, on='stratum identifier', how='left')[
['specimen', 'subject diagnosed result']].rename(
Expand All @@ -164,8 +163,8 @@ def save_importances(_args):

if __name__ == "__main__":
args = parse_arguments()
study_data: Dict[str, Dict] = FeatureMatrixExtractor.extract(
args.spt_db_config_location)[args.study]
extractor = FeatureMatrixExtractor(database_config_file=args.spt_db_config_location)
study_data: dict[str, dict] = extractor.extract(study=args.study)

df_cell = _create_cell_df(
{slide: data['dataframe']
Expand Down
10 changes: 7 additions & 3 deletions spatialprofilingtoolbox/db/expressions_table_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ def ensure_indexed_expressions_table(connection):
ExpressionsTableIndexer.create_index(cursor)
connection.commit()

@staticmethod
def expressions_table_is_indexed_cursor(cursor):
columns = ExpressionsTableIndexer.get_expression_quantification_columns(cursor)
return 'source_specimen' in columns

@staticmethod
def expressions_table_is_indexed(connection):
with connection.cursor() as cursor:
columns = ExpressionsTableIndexer.get_expression_quantification_columns(cursor)
return 'source_specimen' in columns
return ExpressionsTableIndexer.expressions_table_is_indexed_cursor(cursor)

@staticmethod
def get_expression_quantification_columns(cursor):
Expand All @@ -31,7 +35,7 @@ def get_expression_quantification_columns(cursor):

@staticmethod
def create_index(cursor):
ETI = ExpressionsTableIndexer()
ETI = ExpressionsTableIndexer() #pylint: disable=invalid-name
ExpressionsTableIndexer.log_current_indexes(cursor)
logger.debug('Will create extra index column "source_specimen".')
ETI.create_extra_column(cursor)
Expand Down
186 changes: 124 additions & 62 deletions spatialprofilingtoolbox/db/feature_matrix_extractor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""
Convenience provision of a feature matrix for each study, the data retrieved
from the SPT database.
Convenience provision of a feature matrix for each study, the data retrieved from the SPT database.
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pydocstyle says module docstrings should be one liners if it fits on one line (include the start and end quotes). If not, there should be a short description that fits on one line and elaboration after two line breaks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm yes I've thought about this sometimes. There is an 8 character ambiguity, since making a one-line comment block into one comment line increases the size of the line by the size of """ """. In this case, this goes beyond the 100 character line limit I've been using.
It seems that the pydocstyle prescription asks to alter the content of the docstring in this case. This seems like arbitrary carelessness on the part of the style guide, since it follows that docstrings that are between 93 and 100 characters long are not allowed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I acquiesce and reword to make it more concise, putting further detail in the body if it calls for it. Your effective short description limit is about 10 characters shorter than your chosen line limit setting.

import sys
from enum import Enum
from enum import auto
from typing import cast

import pandas as pd
from psycopg2.extensions import cursor as Psycopg2Cursor

from spatialprofilingtoolbox.db.database_connection import DatabaseConnectionMaker
from spatialprofilingtoolbox.db.stratification_puller import StratificationPuller
Expand All @@ -16,26 +19,89 @@
logger = colorized_logger(__name__)


class DBSource(Enum):
"""Indicator of intended database source."""
CURSOR = auto()
CONFIG_FILE = auto()
UNKNOWN = auto()


class FeatureMatrixExtractor:
"""
Pull from the database and create convenience bundle of feature matrices
and metadata.
Pull from the database and create convenience bundle of feature matrices and metadata.
"""
@staticmethod
def extract(database_config_file, specimen: str=None, study: str=None, continuous_also=False):
E = FeatureMatrixExtractor
data_arrays = E.retrieve_expressions_from_database(database_config_file,
specimen=specimen,
study=study,
continuous_also=continuous_also)
centroid_coordinates = E.retrieve_structure_centroids_from_database(database_config_file,
specimen=specimen,
study=study)
stratification = E.retrieve_derivative_stratification_from_database(database_config_file)
study_component_lookup = E.retrieve_study_component_lookup(database_config_file)
merged = E.merge_dictionaries(
E.create_feature_matrices(data_arrays, centroid_coordinates),
E.create_channel_information(data_arrays),

cursor: Psycopg2Cursor
database_config_file: str | None
db_source: DBSource

def __init__(self,
cursor: Psycopg2Cursor | None=None,
database_config_file: str | None=None,
):
self.cursor = cast(Psycopg2Cursor, cursor)
self.database_config_file = database_config_file
if cursor is not None:
self.db_source = DBSource.CURSOR
elif database_config_file is not None:
self.db_source = DBSource.CONFIG_FILE
else:
self.db_source = DBSource.UNKNOWN
self._report_on_arguments()

def _report_on_arguments(self):
if self.cursor is None and self.database_config_file is None:
logger.error('Must supply either cursor or database_config_file.')
if self.cursor is not None and self.database_config_file is not None:
message = 'A cursor and database configuration file were both specified. Using the '\
'cursor.'
logger.warning(message)

def extract(self,
specimen: str | None=None,
study: str | None=None,
continuous_also: bool=False,
):
CarlinLiao marked this conversation as resolved.
Show resolved Hide resolved
extraction = None
match self.db_source:
case DBSource.CURSOR:
extraction = self._extract(
specimen=specimen,
study=study,
continuous_also=continuous_also,
)
case DBSource.CONFIG_FILE:
with DatabaseConnectionMaker(self.database_config_file) as dcm:
with dcm.get_connection().cursor() as cursor:
self.cursor = cursor
extraction = self._extract(
specimen=specimen,
study=study,
continuous_also=continuous_also,
)
case DBSource.UNKNOWN:
logger.error('The database source can not be determined.')
return extraction

def _extract(self,
specimen: str | None=None,
study: str | None=None,
continuous_also: bool=False,
):
data_arrays = self._retrieve_expressions_from_database(
specimen=specimen,
study=study,
continuous_also=continuous_also,
)
centroid_coordinates = self._retrieve_structure_centroids_from_database(
specimen=specimen,
study=study,
)
stratification = self._retrieve_derivative_stratification_from_database()
study_component_lookup = self._retrieve_study_component_lookup()
merged = self._merge_dictionaries(
self._create_feature_matrices(data_arrays, centroid_coordinates),
self._create_channel_information(data_arrays),
stratification,
new_keys=['feature matrices','channel symbols by column name', 'sample cohorts'],
study_component_lookup=study_component_lookup,
Expand All @@ -57,52 +123,47 @@ def redact_dataframes(extraction):
extraction[study_name]['sample cohorts']['assignments'] = None
extraction[study_name]['sample cohorts']['strata'] = None

@staticmethod
def retrieve_expressions_from_database(database_config_file, specimen: str=None,
study: str=None, continuous_also=False):
def _retrieve_expressions_from_database(self,
specimen: str | None=None,
study: str | None=None,
continuous_also: bool=False,
):
logger.info('Retrieving expression data from database.')
with SparseMatrixPuller(database_config_file) as puller:
puller.pull(specimen=specimen, study=study, continuous_also=continuous_also)
data_arrays = puller.get_data_arrays()
puller = SparseMatrixPuller(self.cursor)
puller.pull(specimen=specimen, study=study, continuous_also=continuous_also)
data_arrays = puller.get_data_arrays()
logger.info('Done retrieving expression data from database.')
return data_arrays.get_studies()

@staticmethod
def retrieve_structure_centroids_from_database(database_config_file, specimen: str=None,
study: str=None):
def _retrieve_structure_centroids_from_database(self,
specimen: str | None=None,
study: str | None=None,
):
logger.info('Retrieving polygon centroids from shapefiles in database.')
with StructureCentroidsPuller(database_config_file) as puller:
puller.pull(specimen=specimen, study=study)
structure_centroids = puller.get_structure_centroids()
puller = StructureCentroidsPuller(self.cursor)
puller.pull(specimen=specimen, study=study)
structure_centroids = puller.get_structure_centroids()
logger.info('Done retrieving centroids.')
return structure_centroids.get_studies()

@staticmethod
def retrieve_derivative_stratification_from_database(database_config_file):
def _retrieve_derivative_stratification_from_database(self):
logger.info('Retrieving stratification from database.')
with StratificationPuller(database_config_file=database_config_file) as puller:
puller.pull()
stratification = puller.get_stratification()
puller = StratificationPuller(self.cursor)
puller.pull()
stratification = puller.get_stratification()
logger.info('Done retrieving stratification.')
return stratification

@staticmethod
def retrieve_study_component_lookup(database_config_file):
with DatabaseConnectionMaker(database_config_file=database_config_file) as maker:
connection = maker.get_connection()
cursor = connection.cursor()
cursor.execute('SELECT * FROM study_component ; ')
rows = cursor.fetchall()
cursor.close()
def _retrieve_study_component_lookup(self):
self.cursor.execute('SELECT * FROM study_component ; ')
rows = self.cursor.fetchall()
lookup = {}
for row in rows:
lookup[row[1]] = row[0]
return lookup

@staticmethod
def create_feature_matrices(data_arrays, centroid_coordinates):
logger.info(
'Creating feature matrices from binary data arrays and centroids.')
def _create_feature_matrices(self, data_arrays, centroid_coordinates):
logger.info('Creating feature matrices from binary data arrays and centroids.')
matrices = {}
for k, study_name in enumerate(sorted(list(data_arrays.keys()))):
study = data_arrays[study_name]
Expand All @@ -112,7 +173,7 @@ def create_feature_matrices(data_arrays, centroid_coordinates):
expressions = study['data arrays by specimen'][specimen]
number_channels = len(study['target index lookup'])
rows = [
FeatureMatrixExtractor.create_feature_matrix_row(
self._create_feature_matrix_row(
centroid_coordinates[study_name][specimen][i],
expressions[i],
number_channels,
Expand Down Expand Up @@ -144,40 +205,41 @@ def create_feature_matrices(data_arrays, centroid_coordinates):
return matrices

@staticmethod
def create_feature_matrix_row(centroid, binary, number_channels):
def _create_feature_matrix_row(centroid, binary, number_channels):
template = '{0:0%sb}' % number_channels # pylint: disable=consider-using-f-string
feature_vector = [int(value) for value in list(template.format(binary)[::-1])]
return [centroid[0], centroid[1]] + feature_vector

@staticmethod
def create_channel_information(data_arrays):
def _create_channel_information(self, data_arrays):
return {
study_name: FeatureMatrixExtractor.create_channel_information_for_study(study)
study_name: self._create_channel_information_for_study(study)
for study_name, study in data_arrays.items()
}

@staticmethod
def create_channel_information_for_study(study):
def _create_channel_information_for_study(self, study):
logger.info('Aggregating channel information for one study.')
targets = {int(index): target for target,
index in study['target index lookup'].items()}
symbols = {target: symbol for symbol,
target in study['target by symbol'].items()}
targets = {
int(index): target
for target, index in study['target index lookup'].items()
}
symbols = {
target: symbol
for symbol, target in study['target by symbol'].items()
}
logger.info('Done aggregating channel information.')
return {
f'F{i}': symbols[targets[i]]
for i in sorted([int(index) for index in targets.keys()])
}

@staticmethod
def merge_dictionaries(*args, new_keys: list, study_component_lookup: dict):
def _merge_dictionaries(self, *args, new_keys: list, study_component_lookup: dict):
if not len(args) == len(new_keys):
logger.error(
"Can not match up dictionaries to be merged with the list of key names to be "
"issued for them.")
sys.exit(1)

merged = {}
merged: dict = {}
for i in range(len(new_keys)):
for substudy, value in args[i].items():
merged[study_component_lookup[substudy]] = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
except ModuleNotFoundError as e:
SuggestExtrasException(e, 'db')

bundle = FeatureMatrixExtractor.extract(database_config_file)
extractor = FeatureMatrixExtractor(database_config_file=database_config_file)
bundle: dict = extractor.extract()

for study_name, study in bundle.items():
for specimen, specimen_data in study['feature matrices']:
Expand Down
Loading