diff --git a/scripts/cleanup_database/README.md b/scripts/cleanup_database/README.md new file mode 100644 index 000000000000..9160fbe57490 --- /dev/null +++ b/scripts/cleanup_database/README.md @@ -0,0 +1 @@ +Mics. scripts for cleaning up the database. diff --git a/scripts/cleanup_database/__init__.py b/scripts/cleanup_database/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/scripts/cleanup_database/cleanup_histories.py b/scripts/cleanup_database/cleanup_histories.py new file mode 100644 index 000000000000..bd2bf7df27dd --- /dev/null +++ b/scripts/cleanup_database/cleanup_histories.py @@ -0,0 +1,7 @@ +from history_table_pruner import HistoryTablePruner + +if __name__ == "__main__": + # TODO setup args: batch_size, max_create_time, db_url + db_url = "get this from config" # TODO + htp = HistoryTablePruner(db_url) + htp.run() diff --git a/scripts/cleanup_database/cleanup_histories.sh b/scripts/cleanup_database/cleanup_histories.sh new file mode 100755 index 000000000000..434b404906ea --- /dev/null +++ b/scripts/cleanup_database/cleanup_histories.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +####### +# TODO add description +####### + +cd "$(dirname "$0")"/../.. || exit +python ./scripts/cleanup_database/cleanup_histories.py "$@" diff --git a/scripts/cleanup_database/history_table_pruner.py b/scripts/cleanup_database/history_table_pruner.py new file mode 100644 index 000000000000..aa4a06df3b84 --- /dev/null +++ b/scripts/cleanup_database/history_table_pruner.py @@ -0,0 +1,196 @@ +import datetime +import os +import sys + +sys.path.insert(1, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, "lib"))) + +import logging # TODO more setup + +from sqlalchemy import ( + create_engine, + text, +) + +TMP_TABLE = "tmp_unused_history" + +ASSOC_TABLES = ( + "event", + "history_audit", + "history_tag_association", + "history_annotation_association", + "history_rating_association", + "history_user_share_association", + "default_history_permissions", + "data_manager_history_association", + "cleanup_event_history_association", + "galaxy_session_to_history", +) + +EXCLUDED_ASSOC_TABLES = ( + "job_import_history_archive", + "job_export_history_archive", + "workflow_invocation", + "history_dataset_collection_association", + "job", + "history_dataset_association", +) + +DEFAULT_BATCH_SIZE = 1000 + + +class HistoryTablePruner: + """Removes unused histories (user is null, hid == 1).""" + + def __init__(self, db_url, batch_size=None, max_create_time=None): + self.engine = create_engine(db_url) + self.batch_size = batch_size or DEFAULT_BATCH_SIZE + self.max_create_time = max_create_time or self._get_default_max_create_time() + self.min_id, self.max_id = self._get_min_max_ids() + + def run(self): + """ + Due to the very large size of some tables, we run operations in batches, using low/high history id as boundaries. + """ + if self.min_id is None: + logging.info("No histories exist") + return + + low = self.min_id + high = min(self.max_id, low + self.batch_size) + while low <= self.max_id: + self._run_batch(low, high) + low = high + high = high + self.batch_size + + def _get_default_max_create_time(self): + """By default, do not delete histories created less than a month ago.""" + today = datetime.date.today() + return today.replace(month=today.month - 1) + + def _run_batch(self, low, high): + self._mark_histories_as_deleted_and_purged(low, high) + histories = self._get_histories(low, high) + exclude = self._get_histories_to_exclude(low, high) + + # Calculate set of histories to delete. + to_delete = set(histories) - exclude + if not to_delete: + logging.info(f"No histories to delete in the id range {low}-{high}") + return + + self._create_tmp_table() + try: + self._populate_tmp_table(to_delete) + self._delete_associations() + self._set_references_to_null() + self._delete_histories(low, high) + except Exception as e: + raise e + finally: + self._drop_tmp_table() + + def _get_min_max_ids(self): + stmt = text( + "SELECT min(id), max(id) FROM history WHERE user_id IS NULL AND hid_counter = 1 AND create_time < :create_time" + ) + params = {"create_time": self.max_create_time} + with self.engine.begin() as conn: + minmax = conn.execute(stmt, params).all() + return minmax[0][0], minmax[0][1] + + def _mark_histories_as_deleted_and_purged(self, low, high): + """Mark target histories as deleted and purged to prevent their further usage.""" + logging.info(f"Marking histories {low}-{high} as deleted and purged") + stmt = text( + """ + UPDATE history + SET deleted = TRUE, purged = TRUE + WHERE user_id IS NULL AND hid_counter = 1 AND create_time < :create_time AND id >= :low AND id < :high + """ + ) + params = self._get_stmt_params(low, high) + with self.engine.begin() as conn: + return conn.execute(stmt, params) + + def _get_histories(self, low, high): + """Return ids of histories to delete.""" + logging.info(f"Collecting history ids between {low}-{high}") + stmt = text( + "SELECT id FROM history WHERE user_id IS NULL AND hid_counter = 1 AND create_time < :create_time AND id >= :low AND id < :high" + ) + params = self._get_stmt_params(low, high) + with self.engine.begin() as conn: + return conn.scalars(stmt, params).all() + + def _get_histories_to_exclude(self, low, high): + """Retrieve histories that should NOT be deleted due to existence of associated records that should be preserved.""" + logging.info(f"Collecting ids of histories to exclude based on {len(EXCLUDED_ASSOC_TABLES)} associated tables:") + statements = [] + for table in EXCLUDED_ASSOC_TABLES: + statements.append((table, text(f"SELECT history_id FROM {table} WHERE history_id >= :low AND id < :high"))) + + params = self._get_stmt_params(low, high) + ids = [] + for table, stmt in statements: + with self.engine.begin() as conn: + logging.info(f"\tCollecting history_id from {table}") + ids += conn.scalars(stmt, params).all() + + excluded = set(ids) + if None in excluded: + excluded.remove(None) + return excluded + + def _create_tmp_table(self): + """Create temporary table to hold history ids.""" + stmt = text(f"CREATE TEMPORARY TABLE {TMP_TABLE} (id INT PRIMARY KEY)") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _drop_tmp_table(self): + stmt = text(f"CREATE TEMPORARY TABLE {TMP_TABLE} (id INT PRIMARY KEY)") + stmt = text(f"DROP TABLE {TMP_TABLE}") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _populate_tmp_table(self, to_delete): + """Load ids of histories to delete into temporary table.""" + assert to_delete + logging.info("Populating temporary table") + sql_values = ",".join([f"({id})" for id in to_delete]) + stmt = text(f"INSERT INTO {TMP_TABLE} VALUES {sql_values}") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _delete_associations(self): + """Delete records associated with histories to be deleted.""" + logging.info("Deleting associated records from ...") + + for table in ASSOC_TABLES: + stmt = text(f"DELETE FROM {table} WHERE history_id IN (SELECT id FROM {TMP_TABLE})") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _set_references_to_null(self): + """Set history_id to null in galaxy_session table for records referring to histories to be deleted.""" + logging.info("Set history_id to null in galaxy_session") + stmt = text( + f"UPDATE galaxy_session SET current_history_id = NULL WHERE current_history_id IN (SELECT id FROM {TMP_TABLE})" + ) + with self.engine.begin() as conn: + conn.execute(stmt) + + def _delete_histories(self, low, high): + """Last step: delete histories that are safe to delete.""" + logging.info(f"Delete histories in the id range {low} - {high}") + stmt = text(f"DELETE FROM history WHERE id IN (SELECT id FROM {TMP_TABLE})") + with self.engine.begin() as conn: + conn.execute(stmt) + + def _get_stmt_params(self, low, high): + params = { + "create_time": self.max_create_time, + "low": low, + "high": high, + } + return params diff --git a/scripts/cleanup_database/test/__init__.py b/scripts/cleanup_database/test/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/scripts/cleanup_database/test/conftest.py b/scripts/cleanup_database/test/conftest.py new file mode 100644 index 000000000000..35ca1c84d653 --- /dev/null +++ b/scripts/cleanup_database/test/conftest.py @@ -0,0 +1,338 @@ +import contextlib +import random +import string + +import pytest +from sqlalchemy import ( + create_engine, + text, +) +from sqlalchemy.orm import Session + +from galaxy import model as m + +# utility fixtures + + +@contextlib.contextmanager +def transaction(session): + if not session.in_transaction(): + with session.begin(): + yield + else: + yield + + +@pytest.fixture(scope="module") +def db_url(): + # for postgresql user must have prigileges to execute stmt in teardown() + return "sqlite:///:memory:" # TODO make configurable + + +@pytest.fixture(scope="module") +def engine(db_url): + return create_engine(db_url) + + +@pytest.fixture(autouse=True, scope="module") +def setup(engine): + m.mapper_registry.metadata.create_all(engine) + + +@pytest.fixture(autouse=True) +def teardown(engine): + """Delete all rows from all tables. Called after each test.""" + yield + with engine.begin() as conn: + for table in m.mapper_registry.metadata.tables: + stmt = text(f"ALTER TABLE {table} DISABLE TRIGGER ALL") # disable fkey constraints to delete out of order + conn.execute(stmt) + stmt = text(f"DELETE FROM {table}") + conn.execute(stmt) + + +@pytest.fixture +def session(engine): + engine = engine + return Session(engine) + + +# utility functions + + +def random_str(): + alphabet = string.ascii_lowercase + string.digits + size = random.randint(5, 10) + return "".join(random.choices(alphabet, k=size)) + + +def random_email(): + text = random_str() + return f"{text}@galaxy.testing" + + +# model fixture factories + + +@pytest.fixture +def make_event(session): + def f(**kwd): + model = m.Event(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_galaxy_session(session): + def f(**kwd): + model = m.GalaxySession(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_history(session, make_user): + def f(**kwd): + if "user" not in kwd: + kwd["user"] = make_user() + model = m.History(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_history_annotation_association(session): + def f(**kwd): + model = m.HistoryAnnotationAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_history_tag_association(session): + def f(**kwd): + model = m.HistoryTagAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_job(session): + def f(**kwd): + model = m.Job(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_role(session): + def f(**kwd): + model = m.Role(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_user(session): + def f(**kwd): + if "username" not in kwd: + kwd["username"] = random_str() + if "email" not in kwd: + kwd["email"] = random_email() + if "password" not in kwd: + kwd["password"] = random_str() + model = m.User(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_history_rating_association(session, make_user, make_history): + def f(**kwd): + if "user" not in kwd: + kwd["user"] = make_user() + if "item" not in kwd: + kwd["item"] = make_history() + model = m.HistoryRatingAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_history_user_share_association(session): + def f(**kwd): + model = m.HistoryUserShareAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_default_history_permissions(session, make_history, make_role): + def f(**kwd): + if "history" not in kwd: + kwd["history"] = make_history() + if "action" not in kwd: + kwd["action"] = random_str() + if "role" not in kwd: + kwd["role"] = make_role() + model = m.DefaultHistoryPermissions(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_data_manager_history_association(session): + def f(**kwd): + model = m.DataManagerHistoryAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_cleanup_event_history_association(session): + def f(**kwd): + model = m.CleanupEventHistoryAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_galaxy_session_to_history_association(session, make_history, make_galaxy_session): + def f(**kwd): + if "galaxy_session" not in kwd: + kwd["galaxy_session"] = make_galaxy_session() + if "history" not in kwd: + kwd["history"] = make_history() + model = m.GalaxySessionToHistoryAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_job_import_history_archive(session): + def f(**kwd): + model = m.JobImportHistoryArchive(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_job_export_history_archive(session): + def f(**kwd): + model = m.JobExportHistoryArchive(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_workflow(session): + def f(**kwd): + model = m.Workflow(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_workflow_invocation(session, make_workflow): + def f(**kwd): + if "workflow" not in kwd: + kwd["workflow"] = make_workflow() + model = m.WorkflowInvocation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_history_dataset_collection_association(session): + def f(**kwd): + model = m.HistoryDatasetCollectionAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f + + +@pytest.fixture +def make_history_dataset_association(session): + def f(**kwd): + model = m.HistoryDatasetAssociation(**kwd) + with transaction(session): + session.add(model) + session.commit() + return model + + return f diff --git a/scripts/cleanup_database/test/test_history_table_pruner.py b/scripts/cleanup_database/test/test_history_table_pruner.py new file mode 100644 index 000000000000..dbceb007715d --- /dev/null +++ b/scripts/cleanup_database/test/test_history_table_pruner.py @@ -0,0 +1,166 @@ +import datetime + +import pytest +from sqlalchemy import ( + func, + select, + text, +) + +from galaxy import model as m +from ..history_table_pruner import HistoryTablePruner + + +@pytest.fixture() +def setup_db( + session, + make_user, + make_history, + make_event, + make_history_tag_association, + make_history_annotation_association, + make_history_rating_association, + make_history_user_share_association, + make_default_history_permissions, + make_data_manager_history_association, + make_cleanup_event_history_association, + make_galaxy_session_to_history_association, + make_job_import_history_archive, + make_job_export_history_archive, + make_workflow_invocation, + make_history_dataset_collection_association, + make_job, + make_history_dataset_association, + make_galaxy_session, +): + # 1. Create 100 histories; make them deletable: user = null, hid_counter = 1. + histories = [] + for id in range(100): + h = make_history(id=id) + h.user = None + h.hid_counter = 1 + histories.append(h) + + # 2. Set 10 histories as not deletable: hid_counter != 1. + for i in range(10): + histories[i].hid_counter = 42 + + # 3. Set next 10 histories as not deletable: user not null. + u = make_user() + for i in range(10, 20): + histories[i].user = u + + # 4. For the next 6 histories create associations that cannot be deleted. + make_job_import_history_archive(history=histories[20]) + make_job_export_history_archive(history=histories[21]) + make_workflow_invocation(history=histories[22]) + make_history_dataset_collection_association(history=histories[23]) + make_history_dataset_association(history=histories[25]) + make_job().history = histories[24] + + # 5. For the next 10 histories create associations that can be deleted. + make_event(history=histories[26]) + make_history_tag_association(history=histories[27]) + make_history_annotation_association(history=histories[28]) + make_history_rating_association(item=histories[29]) + make_history_user_share_association(history=histories[30]) + make_default_history_permissions(history=histories[31]) + make_data_manager_history_association(history=histories[32]) + make_cleanup_event_history_association(history_id=histories[33].id) + make_galaxy_session_to_history_association(history=histories[34]) + # HistoryAudit is not instantiable, so created association manually. + stmt = text("insert into history_audit values(:history_id, :update_time)") + params = {"history_id": histories[35].id, "update_time": "01-01-2020"} + session.execute(stmt, params) + + # 6. Create a galaxy_session record referring to a history. + # This cannot be deleted, but the history reference can be set to null. + make_galaxy_session(current_history=histories[36]) + + session.commit() + + # TOTAL counts of loaded histories: + # histories that should NOT be deleted: 10 + 10 + 6 = 26 + # histories that SHOULD be deleted: 100 - 26 = 74 + + +def test_script(setup_db, session, db_url): + + def verify_counts(model, expected): + assert session.scalar(select(func.count()).select_from(model)) == expected + + # 1. Verify history counts + stmt = select(m.History).order_by(m.History.id) + result = session.scalars(stmt).all() + assert len(result) == 100 + for i, h in enumerate(result): + if i < 10: # first 10 + assert h.hid_counter > 1 + assert h.user is None + elif i < 20: # next 10 + assert h.hid_counter == 1 + assert h.user is not None + else: # the rest + assert h.hid_counter == 1 + assert h.user is None + + # 2. Verify association counts + for model in [ + m.JobImportHistoryArchive, + m.JobExportHistoryArchive, + m.WorkflowInvocation, + m.HistoryDatasetCollectionAssociation, + m.Job, + m.HistoryDatasetAssociation, + m.Event, + m.HistoryTagAssociation, + m.HistoryAnnotationAssociation, + m.HistoryRatingAssociation, + m.HistoryUserShareAssociation, + m.DefaultHistoryPermissions, + m.DataManagerHistoryAssociation, + m.CleanupEventHistoryAssociation, + m.GalaxySessionToHistoryAssociation, + m.HistoryAudit, + ]: + verify_counts(model, 1) + verify_counts( + m.GalaxySession, 2 + ) # one extra session was automatically created for GalaxySessionToHistoryAssociation + + # 3. Run pruning script + today = datetime.date.today() + newdate = today.replace(year=today.year + 1) + HistoryTablePruner(db_url, max_create_time=newdate).run() + + # 4 Verify new counts (for details on expected counts see comments in setup_db) + + # 4.1 Verify new history counts + verify_counts(m.History, 26) + + # 4.2 Verify new association counts: no change (these associations should NOT be deleted) + for model in [ + m.JobImportHistoryArchive, + m.JobExportHistoryArchive, + m.WorkflowInvocation, + m.HistoryDatasetCollectionAssociation, + m.Job, + m.HistoryDatasetAssociation, + ]: + verify_counts(model, 1) + verify_counts(m.GalaxySession, 2) + + # 4.3 Verify new association counts: deleted (these associations SHOULD be deleted) + for model in [ + m.Event, + m.HistoryTagAssociation, + m.HistoryAnnotationAssociation, + m.HistoryRatingAssociation, + m.HistoryUserShareAssociation, + m.DefaultHistoryPermissions, + m.DataManagerHistoryAssociation, + m.CleanupEventHistoryAssociation, + m.GalaxySessionToHistoryAssociation, + m.HistoryAudit, + ]: + verify_counts(model, 0)