diff --git a/spatialprofilingtoolbox/db/squidpy_metrics.py b/spatialprofilingtoolbox/db/squidpy_metrics.py index aab720d81..b27982fba 100644 --- a/spatialprofilingtoolbox/db/squidpy_metrics.py +++ b/spatialprofilingtoolbox/db/squidpy_metrics.py @@ -6,6 +6,7 @@ from pandas import DataFrame from anndata import AnnData from squidpy.gr import spatial_autocorr, spatial_neighbors +from psycopg2.extensions import cursor as Psycopg2Cursor from spatialprofilingtoolbox.db.database_connection import DatabaseConnectionMaker from spatialprofilingtoolbox.db.feature_matrix_extractor import FeatureMatrixExtractor @@ -61,18 +62,14 @@ def convert_df_to_anndata( return data -def _spatial_autocorr(data: AnnData) -> tuple[NDArray[Any], NDArray[Any], NDArray[Any]]: - autocorr_metrics: DataFrame = spatial_autocorr( +def _spatial_autocorr(data: AnnData) -> DataFrame: + return spatial_autocorr( data, attr='obs', genes=data.obs.drop('cluster', axis=1).columns.tolist(), corr_method=None, copy=True, ) - i_statistic = autocorr_metrics['I'].to_numpy() - pval_z_sim = autocorr_metrics['pval_z_sim'].to_numpy() - pval_sim = autocorr_metrics['pval_sim'].to_numpy() - return i_statistic, pval_z_sim, pval_sim def create_and_transcribe_squidpy_features( @@ -81,13 +78,13 @@ def create_and_transcribe_squidpy_features( ) -> None: """Transcribe "off-demand" Squidpy feature(s) in features system.""" connection = database_connection_maker.get_connection() + das = DataAnalysisStudyFactory( + connection, study, 'spatial autocorrelation').create() features_by_specimen = _fetch_cells_and_phenotypes( - database_connection_maker, study) + connection.cursor(), study) for sample, df in features_by_specimen.items(): adata = convert_df_to_anndata(df) autocorr_stats = _spatial_autocorr(adata) - das = DataAnalysisStudyFactory( - connection, study, 'spatial autocorrelation').create() with ADIFeaturesUploader( database_connection_maker, data_analysis_study=das, @@ -95,16 +92,17 @@ def create_and_transcribe_squidpy_features( _describe_spatial_autocorr_derivation_method(), 1), impute_zeros=True, ) as feature_uploader: - value = autocorr_stats - for specifier, value in zip(['thing', 'thing'], autocorr_stats): - feature_uploader.stage_feature_value( - (specifier,), sample, value) + for i_cluster, row in autocorr_stats.iterrows(): + for metric_name, metric in zip(row.index, row.to_numpy()): + feature_uploader.stage_feature_value( + (metric_name, i_cluster), sample, metric) def _fetch_cells_and_phenotypes( - database_connection_maker: DatabaseConnectionMaker, + cursor: Psycopg2Cursor, study: str ) -> dict[str, DataFrame]: - study_data: dict[str, dict[str, DataFrame | str]] = FeatureMatrixExtractor.extract( - database_connection_maker, study=study)[study]['feature_matrices'] + extractor = FeatureMatrixExtractor(cursor) + study_data: dict[str, dict[str, DataFrame | str]] = extractor.extract( + study=study)[study]['feature_matrices'] return {specimen: packet['dataframe'] for specimen, packet in study_data.items()} diff --git a/spatialprofilingtoolbox/ondemand/counts_provider.py b/spatialprofilingtoolbox/ondemand/counts_provider.py deleted file mode 100644 index f15fd8431..000000000 --- a/spatialprofilingtoolbox/ondemand/counts_provider.py +++ /dev/null @@ -1,122 +0,0 @@ -""" -Do cell counting for a specific signature, over the specially-created -binary-format index. -""" -import sys -import re -import os -from os.path import join -import json - -from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger - -logger = colorized_logger(__name__) - - -class CountsProvider: - """Scan binary-format expression matrices for specific signatures.""" - - def __init__(self, data_directory): - self.load_expressions_indices(data_directory) - self.load_data_matrices(data_directory) - logger.info('CountsProvider is finished loading source data.') - - def load_expressions_indices(self, data_directory): - logger.debug('Searching for source data in: %s', data_directory) - json_files = [f for f in os.listdir(data_directory) if os.path.isfile( - join(data_directory, f)) and re.search(r'\.json$', f)] - if len(json_files) != 1: - logger.error('Did not find index JSON file.') - sys.exit(1) - with open(join(data_directory, json_files[0]), 'rt', encoding='utf-8') as file: - root = json.loads(file.read()) - entries = root[list(root.keys())[0]] - self.studies = {} - for entry in entries: - self.studies[entry['specimen measurement study name']] = entry - - def load_data_matrices(self, data_directory): - self.data_arrays = {} - for study_name in self.get_study_names(): - self.data_arrays[study_name] = { - item['specimen']: self.get_data_array_from_file( - join(data_directory, item['filename'])) - for item in self.studies[study_name]['expressions files'] - } - - def get_study_names(self): - return self.studies.keys() - - def get_data_array_from_file(self, filename): - data_array = [] - with open(filename, 'rb') as file: - buffer = None - while buffer != b'': - buffer = file.read(8) - data_array.append(int.from_bytes(buffer, 'little')) - return data_array - - def compute_signature(self, channel_names, study_name): - target_by_symbol = self.studies[study_name]['target by symbol'] - target_index_lookup = self.studies[study_name]['target index lookup'] - if not all(name in target_by_symbol.keys() for name in channel_names): - return None - identifiers = [target_by_symbol[name] for name in channel_names] - indices = [target_index_lookup[identifier] - for identifier in identifiers] - signature = 0 - for index in indices: - signature = signature + (1 << index) - return signature - - def count_structures_of_exact_signature(self, signature, study_name): - counts = {} - for specimen, data_array in self.data_arrays[study_name].items(): - count = 0 - for entry in data_array: - if entry == signature: - count = count + 1 - counts[specimen] = [count, len(data_array)] - return counts - - def count_structures_of_partial_signature(self, signature, study_name): - counts = {} - for specimen, data_array in self.data_arrays[study_name].items(): - count = 0 - for entry in data_array: - if entry | signature == entry: - count = count + 1 - counts[specimen] = [count, len(data_array)] - return counts - - def count_structures_of_partial_signed_signature( - self, positives_signature, negatives_signature, study_name): - counts = {} - for specimen, data_array in self.data_arrays[study_name].items(): - count = 0 - for entry in data_array: - if (entry | positives_signature == entry) and \ - (~entry | negatives_signature == ~entry): - count = count + 1 - counts[specimen] = [count, len(data_array)] - return counts - - def get_status(self): - return [ - { - 'study': study_name, - 'counts by channel': [ - { - 'channel symbol': symbol, - 'count': self.count_structures_of_partial_signed_signature( - [symbol], [], study_name), - } - for symbol in sorted(list(targets['target by symbol'].keys())) - ], - 'total number of cells': len(self.data_arrays[study_name]), - } - for study_name, targets in self.studies.items() - ] - - def has_study(self, study_name): - return study_name in self.studies diff --git a/spatialprofilingtoolbox/ondemand/providers/provider.py b/spatialprofilingtoolbox/ondemand/providers/provider.py index ff12c294d..b7ec016b1 100644 --- a/spatialprofilingtoolbox/ondemand/providers/provider.py +++ b/spatialprofilingtoolbox/ondemand/providers/provider.py @@ -82,16 +82,6 @@ def get_study_names(self) -> list[str]: """Retrieve names of the studies held in memory.""" return self.studies.keys() - # def get_data_array_from_file(self, filename: str) -> list[int]: - # """Load data arrays from a precomputed JSON artifact.""" - # data_array = [] - # with open(filename, 'rb') as file: - # buffer = None - # while buffer != b'': - # buffer = file.read(8) - # data_array.append(int.from_bytes(buffer, 'little')) - # return data_array - def get_data_array_from_file( self, filename: str, diff --git a/spatialprofilingtoolbox/ondemand/proximity_provider.py b/spatialprofilingtoolbox/ondemand/proximity_provider.py deleted file mode 100644 index af303664d..000000000 --- a/spatialprofilingtoolbox/ondemand/proximity_provider.py +++ /dev/null @@ -1,368 +0,0 @@ -""" -Do proximity calculation from pair of signatures. -""" -import sys -import re -from os import listdir -from os.path import isfile -from os.path import join -import json -from threading import Thread -from datetime import datetime - -import pandas as pd -import numpy as np -from sklearn.neighbors import BallTree - -from spatialprofilingtoolbox.db.database_connection import DBCursor -from spatialprofilingtoolbox.ondemand.phenotype_str import phenotype_str_to_phenotype -from spatialprofilingtoolbox.ondemand.phenotype_str import phenotype_to_phenotype_str -from spatialprofilingtoolbox.workflow.common.export_features import ADIFeatureSpecificationUploader -from spatialprofilingtoolbox.workflow.common.export_features import add_feature_value -from spatialprofilingtoolbox.workflow.common.proximity import \ - describe_proximity_feature_derivation_method -from spatialprofilingtoolbox.workflow.common.structure_centroids import StructureCentroids -from spatialprofilingtoolbox.workflow.common.proximity import \ - compute_proximity_metric_for_signature_pair -from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger - -logger = colorized_logger(__name__) - - -class ProximityProvider: - """Do proximity calculation from pair of signatures.""" - - def __init__(self, data_directory): - self.data_directory = data_directory - self.load_expressions_indices() - - loader = StructureCentroids() - loader.load_from_file(self.get_data_directory()) - centroids = loader.get_studies() - - self.load_data_matrices(centroids) - logger.info('ProximityProvider is finished loading source expression data.') - - logger.info('Start loading location data and creating ball trees.') - self.create_ball_trees(centroids) - logger.info('Finished creating ball trees.') - - def get_data_directory(self): - return self.data_directory - - def load_expressions_indices(self): - logger.debug('Searching for source data in: %s', self.get_data_directory()) - json_files = [ - f for f in listdir(self.get_data_directory()) - if isfile(join(self.get_data_directory(), f)) and re.search(r'\.json$', f) - ] - if len(json_files) != 1: - logger.error('Did not find index JSON file.') - sys.exit(1) - index_file = join(self.get_data_directory(), json_files[0]) - with open(index_file, 'rt', encoding='utf-8') as file: - root = json.loads(file.read()) - entries = root[list(root.keys())[0]] - self.studies = {} - for entry in entries: - self.studies[entry['specimen measurement study name']] = entry - - def load_data_matrices(self, centroids): - self.data_arrays = {} - for study_name in self.get_study_names(): - self.data_arrays[study_name] = { - item['specimen']: - self.get_data_array_from_file( - join(self.get_data_directory(), item['filename']), - self.studies[study_name]['target index lookup'], - self.studies[study_name]['target by symbol'], - study_name, - item['specimen'], - centroids, - ) - for item in self.studies[study_name]['expressions files'] - } - shapes = [df.shape for df in self.data_arrays[study_name].values()] - logger.debug('Loaded dataframes of sizes %s', self.abbreviate_list(list(shapes))) - number_specimens = len(self.data_arrays[study_name]) - specimens = self.data_arrays[study_name].keys() - logger.debug( - '%s specimens loaded (%s).', - number_specimens, - self.abbreviate_list(list(specimens)), - ) - - def abbreviate_list(self, items: list): - if len(items) > 5: - return items[0:5] + [f'... ({len(items)} total items)'] - return items - - def create_ball_trees(self, centroids): - self.trees = { - study_name: { - sample_identifier: BallTree(np.array(points_list)) - for sample_identifier, points_list in points_lists.items() - } - for study_name, points_lists in centroids.items() - } - - def get_study_names(self): - return self.studies.keys() - - def get_data_array_from_file(self, filename, target_index_lookup, target_by_symbol, study_name, - sample, centroids): - rows = [] - columns = self.list_columns(target_index_lookup, target_by_symbol) - with open(filename, 'rb') as file: - buffer = None - while True: - buffer = file.read(8) - if buffer == b'': - break - binary_expression_64_string = ''.join([ - ''.join(list(reversed(bin(ii)[2:].rjust(8,'0')))) - for ii in buffer - ]) - truncated_to_channels = binary_expression_64_string[0:len(columns)] - row = [int(b) for b in list(truncated_to_channels)] - rows.append(row) - df = pd.DataFrame(rows, columns=columns) - df['pixel x'] = [point[0] for point in centroids[study_name][sample]] - df['pixel y'] = [point[1] for point in centroids[study_name][sample]] - return df - - @staticmethod - def list_columns(target_index_lookup, target_by_symbol): - target_by_index = {value: key for key, value in target_index_lookup.items()} - symbol_by_target = {value: key for key, value in target_by_symbol.items()} - return [ - symbol_by_target[target_by_index[i]] - for i in range(len(target_by_index)) - ] - - @staticmethod - def get_or_create_feature_specification(study_name, phenotype1, phenotype2, radius): - phenotype1_str = phenotype_to_phenotype_str(phenotype1) - phenotype2_str = phenotype_to_phenotype_str(phenotype2) - args = ( - study_name, - phenotype1_str, - phenotype2_str, - str(radius), - describe_proximity_feature_derivation_method(), - ) - with DBCursor() as cursor: - cursor.execute(''' - SELECT - fsn.identifier, - fs.specifier - FROM feature_specification fsn - JOIN feature_specifier fs ON fs.feature_specification=fsn.identifier - JOIN study_component sc ON sc.component_study=fsn.study - JOIN study_component sc2 ON sc2.primary_study=sc.primary_study - WHERE sc2.component_study=%s AND - ( (fs.specifier=%s AND fs.ordinality='1') - OR (fs.specifier=%s AND fs.ordinality='2') - OR (fs.specifier=%s AND fs.ordinality='3') ) AND - fsn.derivation_method=%s - ; - ''', args) - rows = cursor.fetchall() - feature_specifications = {row[0]: [] for row in rows} - for row in rows: - feature_specifications[row[0]].append(row[1]) - for key, specifiers in feature_specifications.items(): - if len(specifiers) == 3: - return key - message = 'Creating feature with specifiers: (%s) %s, %s, %s' - logger.debug(message, study_name, phenotype1_str, phenotype2_str, radius) - specification = ProximityProvider.create_feature_specification( - study_name, - phenotype1_str, - phenotype2_str, - radius, - ) - return specification - - @staticmethod - def create_feature_specification(study_name, phenotype1, phenotype2, radius): - specifiers = [phenotype1, phenotype2, str(radius)] - method = describe_proximity_feature_derivation_method() - with DBCursor() as cursor: - Uploader = ADIFeatureSpecificationUploader - feature_specification = Uploader.add_new_feature(specifiers, method, study_name, cursor) - return feature_specification - - @staticmethod - def is_already_computed(feature_specification): - expected = ProximityProvider.get_expected_number_of_computed_values(feature_specification) - actual = ProximityProvider.get_actual_number_of_computed_values(feature_specification) - if actual < expected: - return False - if actual == expected: - return True - message = 'Possibly too many computed values of the given type?' - raise ValueError(f'{message} Feature "{feature_specification}"') - - @staticmethod - def get_expected_number_of_computed_values(feature_specification): - domain = ProximityProvider.get_expected_domain_for_computed_values(feature_specification) - number = len(domain) - logger.debug('Number of values possible to be computed: %s', number) - return number - - @staticmethod - def get_expected_domain_for_computed_values(feature_specification): - with DBCursor() as cursor: - cursor.execute(''' - SELECT DISTINCT sdmp.specimen FROM specimen_data_measurement_process sdmp - JOIN study_component sc1 ON sc1.component_study=sdmp.study - JOIN study_component sc2 ON sc1.primary_study=sc2.primary_study - JOIN feature_specification fsn ON fsn.study=sc2.component_study - WHERE fsn.identifier=%s - ; - ''', (feature_specification,)) - rows = cursor.fetchall() - return [row[0] for row in rows] - - @staticmethod - def get_actual_number_of_computed_values(feature_specification): - with DBCursor() as cursor: - cursor.execute(''' - SELECT COUNT(*) FROM quantitative_feature_value qfv - WHERE qfv.feature=%s - ; - ''', (feature_specification,)) - rows = cursor.fetchall() - logger.debug('Actual number computed: %s', rows[0][0]) - return rows[0][0] - - @staticmethod - def is_already_pending(feature_specification): - with DBCursor() as cursor: - cursor.execute(''' - SELECT * FROM pending_feature_computation pfc - WHERE pfc.feature_specification=%s - ''', (feature_specification,)) - rows = cursor.fetchall() - if len(rows) >=1: - return True - return False - - @staticmethod - def retrieve_specifiers(feature_specification): - with DBCursor() as cursor: - cursor.execute(''' - SELECT fs.specifier, fs.ordinality - FROM feature_specifier fs - WHERE fs.feature_specification=%s ; - ''', (feature_specification,)) - rows = cursor.fetchall() - specifiers = [row[0] for row in sorted(rows, key=lambda row: int(row[1]))] - cursor.execute(''' - SELECT sc2.component_study FROM feature_specification fs - JOIN study_component sc ON sc.component_study=fs.study - JOIN study_component sc2 ON sc.primary_study=sc2.primary_study - WHERE fs.identifier=%s AND - sc2.component_study IN ( SELECT name FROM specimen_measurement_study ) - ; - ''', (feature_specification,)) - study = cursor.fetchall()[0][0] - return [ - study, - phenotype_str_to_phenotype(specifiers[0]), - phenotype_str_to_phenotype(specifiers[1]), - float(specifiers[2]), - ] - - def fork_computation_task(self, feature_specification): - background_thread = Thread( - target=self.do_proximity_metrics_one_feature, - args=(feature_specification,) - ) - background_thread.start() - - @staticmethod - def set_pending_computation(feature_specification): - time_str = datetime.now().ctime() - with DBCursor() as cursor: - cursor.execute(''' - INSERT INTO pending_feature_computation (feature_specification, time_initiated) - VALUES (%s, %s) ; - ''', (feature_specification, time_str)) - - @staticmethod - def drop_pending_computation(feature_specification): - with DBCursor() as cursor: - cursor.execute(''' - DELETE FROM pending_feature_computation pfc - WHERE pfc.feature_specification=%s ; - ''', (feature_specification, )) - - def do_proximity_metrics_one_feature(self, feature_specification): - specifiers = ProximityProvider.retrieve_specifiers(feature_specification) - study_name, phenotype1, phenotype2, radius = specifiers - sample_identifiers = self.get_sample_identifiers(feature_specification) - for sample_identifier in sample_identifiers: - value = compute_proximity_metric_for_signature_pair( - phenotype1, - phenotype2, - radius, - self.get_cells(sample_identifier, study_name), - self.get_tree(sample_identifier, study_name), - ) - message = 'Computed one feature value of %s: %s, %s' - logger.debug(message, feature_specification, sample_identifier, value) - with DBCursor() as cursor: - add_feature_value(feature_specification, sample_identifier, value, cursor) - ProximityProvider.drop_pending_computation(feature_specification) - message = 'Wrapped up proximity metric calculation, feature "%s".' - logger.debug(message, feature_specification) - logger.debug('The samples considered were: %s', sample_identifiers) - - @staticmethod - def query_for_computed_feature_values(feature_specification, still_pending=False): - with DBCursor() as cursor: - cursor.execute(''' - SELECT qfv.subject, qfv.value - FROM quantitative_feature_value qfv - WHERE qfv.feature=%s - ''', (feature_specification,)) - rows = cursor.fetchall() - metrics = {row[0]: float(row[1]) if row[1] else None for row in rows} - return { - 'metrics': metrics, - 'pending': still_pending, - } - - def compute_metrics(self, study_name, phenotype1, phenotype2, radius): - logger.debug('Requesting computation.') - PP = ProximityProvider - feature_specification = PP.get_or_create_feature_specification( - study_name, phenotype1, phenotype2, radius) - if PP.is_already_computed(feature_specification): - is_pending=False - logger.debug('Already computed.') - else: - is_pending = PP.is_already_pending(feature_specification) - if is_pending: - logger.debug('Already already pending.') - else: - logger.debug('Not already pending.') - if not is_pending: - logger.debug('Starting background task.') - self.fork_computation_task(feature_specification) - PP.set_pending_computation(feature_specification) - logger.debug('Background task just started, is pending.') - is_pending = True - return PP.query_for_computed_feature_values(feature_specification, still_pending=is_pending) - - def get_cells(self, sample_identifier, study_name): - return self.data_arrays[study_name][sample_identifier] - - def get_tree(self, sample_identifier, study_name): - return self.trees[study_name][sample_identifier] - - @staticmethod - def get_sample_identifiers(feature_specification): - return ProximityProvider.get_expected_domain_for_computed_values(feature_specification)