diff --git a/pyproject.toml b/pyproject.toml index cb6a9d194..218855666 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -126,6 +126,9 @@ markers = [ Google credentials (skipped on GitHub CI) \ """, """\ + not_windows: tests that don't work on on windows machine + """, + """\ schematic_api: marks tests covering \ API functionality (skipped on regular GitHub CI test suite) """, @@ -143,4 +146,4 @@ markers = [ rule_benchmark: marks tests covering \ validation rule benchmarking """ -] +] \ No newline at end of file diff --git a/schematic/store/synapse.py b/schematic/store/synapse.py index cdc93f434..93da6109f 100644 --- a/schematic/store/synapse.py +++ b/schematic/store/synapse.py @@ -7,7 +7,7 @@ import logging import secrets from dataclasses import dataclass -import tempfile +import shutil # allows specifying explicit variable types from typing import Dict, List, Tuple, Sequence, Union @@ -43,9 +43,9 @@ from schematic_db.rdb.synapse_database import SynapseDatabase -from schematic.utils.df_utils import update_df, load_df, col_in_dataframe, populate_df_col_with_another_col +from schematic.utils.df_utils import update_df, load_df, col_in_dataframe from schematic.utils.validate_utils import comma_separated_list_regex, rule_in_rule_list -from schematic.utils.general import entity_type_mapping, get_dir_size, convert_size, convert_gb_to_bytes, create_temp_folder +from schematic.utils.general import entity_type_mapping, get_dir_size, convert_gb_to_bytes, create_temp_folder, check_synapse_cache_size, clear_synapse_cache from schematic.schemas.explorer import SchemaExplorer from schematic.schemas.generator import SchemaGenerator from schematic.store.base import BaseStorage @@ -53,7 +53,7 @@ from schematic.configuration.configuration import CONFIG -from schematic.utils.general import profile +from schematic.utils.general import profile, calculate_datetime logger = logging.getLogger("Synapse storage") @@ -75,12 +75,16 @@ def _download_manifest_to_folder(self) -> File: """ if "SECRETS_MANAGER_SECRETS" in os.environ: temporary_manifest_storage = "/var/tmp/temp_manifest_download" + # clear out all the existing manifests + if os.path.exists(temporary_manifest_storage): + shutil.rmtree(temporary_manifest_storage) + # create a new directory to store manifest if not os.path.exists(temporary_manifest_storage): - os.mkdir("/var/tmp/temp_manifest_download") + os.mkdir(temporary_manifest_storage) + # create temporary folders for storing manifests download_location = create_temp_folder(temporary_manifest_storage) else: download_location=CONFIG.manifest_folder - manifest_data = self.syn.get( self.manifest_id, downloadLocation=download_location, @@ -177,41 +181,34 @@ def __init__( Typical usage example: syn_store = SynapseStorage() """ - + # TODO: turn root_synapse_cache to a parameter in init self.syn = self.login(token, access_token) self.project_scope = project_scope self.storageFileview = CONFIG.synapse_master_fileview_id self.manifest = CONFIG.synapse_manifest_basename + self.root_synapse_cache = "/root/.synapseCache" self._query_fileview() - def _purge_synapse_cache(self, root_dir: str = "/var/www/.synapseCache/", maximum_storage_allowed_cache_gb=7): + def _purge_synapse_cache(self, maximum_storage_allowed_cache_gb=1): """ - Purge synapse cache if it exceeds 7GB + Purge synapse cache if it exceeds a certain size. Default to 1GB. Args: - root_dir: directory of the .synapseCache function - maximum_storage_allowed_cache_gb: the maximum storage allowed before purging cache. Default is 7 GB. - - Returns: - if size of cache reaches a certain threshold (default is 7GB), return the number of files that get deleted - otherwise, return the total remaining space (assuming total ephemeral storage is 20GB on AWS ) + maximum_storage_allowed_cache_gb: the maximum storage allowed before purging cache. Default is 1 GB. """ # try clearing the cache # scan a directory and check size of files - cache = self.syn.cache - if os.path.exists(root_dir): + if os.path.exists(self.root_synapse_cache): maximum_storage_allowed_cache_bytes = convert_gb_to_bytes(maximum_storage_allowed_cache_gb) - total_ephemeral_storag_gb = 20 - total_ephemeral_storage_bytes = convert_gb_to_bytes(total_ephemeral_storag_gb) - nbytes = get_dir_size(root_dir) - # if 7 GB has already been taken, purge cache before 15 min - if nbytes >= maximum_storage_allowed_cache_bytes: - minutes_earlier = datetime.strftime(datetime.utcnow()- timedelta(minutes = 15), '%s') - num_of_deleted_files = cache.purge(before_date = int(minutes_earlier)) - logger.info(f'{num_of_deleted_files} number of files have been deleted from {root_dir}') + nbytes = get_dir_size(self.root_synapse_cache) + dir_size_bytes = check_synapse_cache_size(directory=self.root_synapse_cache) + # if 1 GB has already been taken, purge cache before 15 min + if dir_size_bytes >= maximum_storage_allowed_cache_bytes: + num_of_deleted_files = clear_synapse_cache(self.syn.cache, minutes=15) + logger.info(f'{num_of_deleted_files} files have been deleted from {self.root_synapse_cache}') else: - remaining_space = total_ephemeral_storage_bytes - nbytes - converted_space = convert_size(remaining_space) - logger.info(f'Estimated {remaining_space} bytes (which is approximately {converted_space}) remained in ephemeral storage after calculating size of .synapseCache excluding OS') + # on AWS, OS takes around 14-17% of our ephemeral storage (20GiB) + # instead of guessing how much space that we left, print out .synapseCache here + logger.info(f'the total size of .synapseCache is: {nbytes} bytes') def _query_fileview(self): self._purge_synapse_cache() diff --git a/schematic/utils/general.py b/schematic/utils/general.py index 59edf4243..8b7b62e35 100644 --- a/schematic/utils/general.py +++ b/schematic/utils/general.py @@ -1,17 +1,20 @@ # allows specifying explicit variable types -from typing import Any, Dict, Optional, Text -import os -import math import logging +import math +import os import pstats +import subprocess +import tempfile from cProfile import Profile +from datetime import datetime, timedelta from functools import wraps - -import tempfile +from typing import Union from synapseclient.core.exceptions import SynapseHTTPError -from synapseclient.table import EntityViewSchema from synapseclient.entity import File, Folder, Project +from synapseclient.table import EntityViewSchema + +import synapseclient.core.cache as cache logger = logging.getLogger(__name__) @@ -57,24 +60,69 @@ def get_dir_size(path: str): total += get_dir_size(entry.path) return total +def calculate_datetime(minutes: int, input_date: datetime, before_or_after: str = "before") -> datetime: + """calculate date time + + Args: + input_date (datetime): date time object provided by users + minutes (int): number of minutes + before_or_after (str): default to "before". if "before", calculate x minutes before current date time. if "after", calculate x minutes after current date time. + + Returns: + datetime: return result of date time calculation + """ + if before_or_after=="before": + date_time_result = input_date - timedelta(minutes=minutes) + elif before_or_after=="after": + date_time_result = input_date + timedelta(minutes=minutes) + else: + raise ValueError("Invalid value. Use either 'before' or 'after'.") + return date_time_result + + +def check_synapse_cache_size(directory='/root/.synapseCache')-> Union[float, int]: + """use du --sh command to calculate size of .synapseCache. -def convert_size(size_bytes: int): - """convert bytes to a human readable format Args: - size_bytes: total byte sizes - return: a string that indicates bytes in a different format + directory (str, optional): .synapseCache directory. Defaults to '/root/.synapseCache' + + Returns: + float or integer: returns size of .synapsecache directory in bytes """ - if size_bytes == 0: - return "0B" - size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") - # calculate the log of size (in bytes) to base 1024 and run it down to the nearest integer - index_int = int(math.floor(math.log(size_bytes, 1024))) - # return the value of 1024 raised to the power of index - power_cal = math.pow(1024, index_int) - # convert bytes to a different unit if applicable - size_bytes_converted = round(size_bytes / power_cal, 2) - return f"{size_bytes_converted} {size_name[index_int]})" + # Note: this command might fail on windows user. But since this command is primarily for running on AWS, it is fine. + command = ['du', '-sh', directory] + output = subprocess.run(command, capture_output=True).stdout.decode('utf-8') + + # Parsing the output to extract the directory size + size = output.split('\t')[0] + if "K" in size: + size_in_kb = float(size.rstrip('K')) + byte_size = size_in_kb * 1000 + elif "M" in size: + size_in_mb = float(size.rstrip('M')) + byte_size = size_in_mb * 1000000 + elif "G" in size: + size_in_gb = float(size.rstrip('G')) + byte_size = convert_gb_to_bytes(size_in_gb) + elif "B" in size: + byte_size = float(size.rstrip('B')) + else: + logger.error('Cannot recongize the file size unit') + return byte_size + +def clear_synapse_cache(cache: cache.Cache, minutes: int) -> int: + """clear synapse cache before a certain time + Args: + cache: an object of synapseclient Cache. + minutes (int): all files before this minute will be removed + Returns: + int: number of files that get deleted + """ + current_date = datetime.utcnow() + minutes_earlier = calculate_datetime(input_date=current_date, minutes=minutes, before_or_after="before") + num_of_deleted_files = cache.purge(before_date = minutes_earlier) + return num_of_deleted_files def convert_gb_to_bytes(gb: int): """convert gb to bytes @@ -84,6 +132,7 @@ def convert_gb_to_bytes(gb: int): """ return gb * 1024 * 1024 * 1024 + def entity_type_mapping(syn, entity_id): """ Return the entity type of manifest diff --git a/tests/test_utils.py b/tests/test_utils.py index 98fa3b63a..5fa9003f3 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,34 +1,36 @@ -import logging import json +import logging import os +import shutil +import tempfile +import time +from datetime import datetime +from unittest import mock -import pandas as pd import numpy as np +import pandas as pd import pytest - -import tempfile - +import synapseclient +import synapseclient.core.cache as cache from pandas.testing import assert_frame_equal from synapseclient.core.exceptions import SynapseHTTPError -from schematic.schemas.explorer import SchemaExplorer -from schematic.schemas import df_parser -from schematic.utils import general -from schematic.utils import cli_utils -from schematic.utils import io_utils -from schematic.utils import df_utils -from schematic.utils import validate_utils -from schematic.exceptions import ( - MissingConfigValueError, - MissingConfigAndArgumentValueError, -) from schematic import LOADER +from schematic.exceptions import (MissingConfigAndArgumentValueError, + MissingConfigValueError) +from schematic.schemas import df_parser +from schematic.schemas.explorer import SchemaExplorer from schematic.store.synapse import SynapseStorage -from schematic.utils.general import entity_type_mapping +from schematic.utils import (cli_utils, df_utils, general, io_utils, + validate_utils) +from schematic.utils.general import (calculate_datetime, + check_synapse_cache_size, + clear_synapse_cache, entity_type_mapping) logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) +IN_GITHUB_ACTIONS = os.getenv("GITHUB_ACTIONS") @pytest.fixture def synapse_store(): @@ -39,8 +41,69 @@ def synapse_store(): synapse_store = SynapseStorage() yield synapse_store - class TestGeneral: + def test_clear_synapse_cache(self, tmp_path): + # define location of mock synapse cache + mock_synapse_cache_dir = tmp_path / ".synapseCache/" + mock_synapse_cache_dir.mkdir() + mock_sub_folder = mock_synapse_cache_dir / "123" + mock_sub_folder.mkdir() + mock_table_query_folder = mock_sub_folder/ "456" + mock_table_query_folder.mkdir() + + # create mock table query csv and a mock cache map + mock_synapse_table_query_csv = mock_table_query_folder/ "mock_synapse_table_query.csv" + mock_synapse_table_query_csv.write_text("mock table query content") + mock_cache_map = mock_table_query_folder/ ".cacheMap" + mock_cache_map.write_text(f"{mock_synapse_table_query_csv}: '2022-06-13T19:24:27.000Z'") + + assert os.path.exists(mock_synapse_table_query_csv) + + # since synapse python client would compare last modified date and before date + # we have to create a little time gap here + time.sleep(1) + + # clear cache + my_cache = cache.Cache(cache_root_dir=mock_synapse_cache_dir) + clear_synapse_cache(my_cache, minutes=0.0001) + # make sure that cache files are now gone + assert os.path.exists(mock_synapse_table_query_csv) == False + assert os.path.exists(mock_cache_map) == False + + def test_calculate_datetime_before_minutes(self): + input_date = datetime.strptime("07/20/23 17:36:34", '%m/%d/%y %H:%M:%S') + minutes_before = calculate_datetime(input_date=input_date, minutes=10, before_or_after="before") + expected_result_date_before = datetime.strptime("07/20/23 17:26:34", '%m/%d/%y %H:%M:%S') + assert minutes_before == expected_result_date_before + + def test_calculate_datetime_after_minutes(self): + input_date = datetime.strptime("07/20/23 17:36:34", '%m/%d/%y %H:%M:%S') + minutes_after = calculate_datetime(input_date=input_date, minutes=10, before_or_after="after") + expected_result_date_after = datetime.strptime("07/20/23 17:46:34", '%m/%d/%y %H:%M:%S') + assert minutes_after == expected_result_date_after + + def test_calculate_datetime_raise_error(self): + with pytest.raises(ValueError): + input_date = datetime.strptime("07/20/23 17:36:34", '%m/%d/%y %H:%M:%S') + minutes = calculate_datetime(input_date=input_date, minutes=10, before_or_after="error") + + # this test might fail for windows machine + @pytest.mark.not_windows + def test_check_synapse_cache_size(self,tmp_path): + mock_synapse_cache_dir = tmp_path / ".synapseCache" + mock_synapse_cache_dir.mkdir() + + mock_synapse_table_query_csv = mock_synapse_cache_dir/ "mock_synapse_table_query.csv" + mock_synapse_table_query_csv.write_text("example file for calculating cache") + + file_size = check_synapse_cache_size(mock_synapse_cache_dir) + + # For some reasons, when running in github action, the size of file changes. + if IN_GITHUB_ACTIONS: + assert file_size == 8000 + else: + assert file_size == 4000 + def test_find_duplicates(self): mock_list = ["foo", "bar", "foo"] @@ -84,6 +147,7 @@ def test_download_manifest_to_temp_folder(self): path_dir = general.create_temp_folder(tmpdir) assert os.path.exists(path_dir) + class TestCliUtils: def test_query_dict(self):