-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add db managements script to drop pending computatations (#196)
* Adding computed feature dropper. * Add logging statements to dropper. * Correct drop order.
- Loading branch information
1 parent
202249f
commit 03bbfe0
Showing
3 changed files
with
100 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
"""Drop ondemand-computed feature values, specifications, etc.""" | ||
|
||
from typing import cast | ||
import re | ||
|
||
from psycopg2.extensions import cursor as Psycopg2Cursor | ||
|
||
from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger | ||
|
||
logger = colorized_logger(__name__) | ||
|
||
|
||
class OnDemandComputationsDropper: | ||
"""Drop ondemand-computed feature values, specifications, etc.""" | ||
|
||
@staticmethod | ||
def drop(cursor: Psycopg2Cursor, pending_only: bool = False, drop_all: bool = False): | ||
specifications = cast(list[str], OnDemandComputationsDropper.get_droppable( | ||
cursor, | ||
pending_only=pending_only, | ||
drop_all=drop_all, | ||
)) | ||
OnDemandComputationsDropper.drop_features(cursor, specifications) | ||
|
||
@staticmethod | ||
def get_droppable( | ||
cursor: Psycopg2Cursor, | ||
pending_only: bool = False, | ||
drop_all: bool = False, | ||
) -> list[str] | None: | ||
if pending_only: | ||
cursor.execute('SELECT feature_specification FROM pending_feature_computation;') | ||
return [row[0] for row in cursor.fetchall()] | ||
if drop_all: | ||
cursor.execute('SELECT DISTINCT study FROM feature_specification;') | ||
studies = [row[0] for row in cursor.fetchall()] | ||
studies = [s for s in studies if re.search(r' ondemand computed features$', s)] | ||
specifications: list[str] = [] | ||
for study in studies: | ||
query = 'SELECT identifier FROM feature_specification WHERE study=%s ;' | ||
cursor.execute(query, (study,)) | ||
_specifications = [row[0] for row in cursor.fetchall()] | ||
specifications = specifications + _specifications | ||
return specifications | ||
return None | ||
|
||
@staticmethod | ||
def drop_features(cursor: Psycopg2Cursor, specifications: list[str]): | ||
for specification in specifications: | ||
queries = [ | ||
'DELETE FROM pending_feature_computation WHERE feature_specification=%s ;', | ||
'DELETE FROM quantitative_feature_value WHERE feature=%s ;', | ||
'DELETE FROM feature_specifier WHERE feature_specification=%s ;', | ||
'DELETE FROM feature_specification WHERE identifier=%s ;', | ||
] | ||
for query in queries: | ||
logger.debug(query, specification) | ||
cursor.execute(query, (specification,)) |
40 changes: 40 additions & 0 deletions
40
spatialprofilingtoolbox/db/scripts/drop_ondemand_computations.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
"""Utility to managed features computed on demand.""" | ||
import argparse | ||
|
||
from spatialprofilingtoolbox.db.ondemand_dropper import OnDemandComputationsDropper | ||
from spatialprofilingtoolbox.db.database_connection import get_and_validate_database_config | ||
from spatialprofilingtoolbox.workflow.common.cli_arguments import add_argument | ||
from spatialprofilingtoolbox import DBCursor | ||
|
||
from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger | ||
|
||
logger = colorized_logger('spt db status') | ||
|
||
def main(): | ||
parser = argparse.ArgumentParser( | ||
prog='spt db drop-ondemand-computations', | ||
description='Report basic health status of the given scstudies database.' | ||
) | ||
add_argument(parser, 'database config') | ||
group = parser.add_mutually_exclusive_group(required=True) | ||
group.add_argument( | ||
'--pending-only', | ||
action='store_true', | ||
default=False, | ||
) | ||
group.add_argument( | ||
'--all', | ||
action='store_true', | ||
default=False, | ||
) | ||
args = parser.parse_args() | ||
|
||
config_file = get_and_validate_database_config(args) | ||
with DBCursor(database_config_file=config_file) as cursor: | ||
if args.pending_only: | ||
OnDemandComputationsDropper.drop(cursor, pending_only=True) | ||
if args.all: | ||
OnDemandComputationsDropper.drop(cursor, drop_all=True) | ||
|
||
if __name__ == '__main__': | ||
main() |