Skip to content

Commit

Permalink
Merge pull request #1268 from Sage-Bionetworks/develop-fix-synapsecac…
Browse files Browse the repository at this point in the history
…he-issue

[bug fix] Updated .synapseCache, functions to calculate cache, and cleared manifests before each download
  • Loading branch information
linglp authored Aug 3, 2023
2 parents 757abd1 + 3dc7028 commit 388584f
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 67 deletions.
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ markers = [
Google credentials (skipped on GitHub CI) \
""",
"""\
not_windows: tests that don't work on on windows machine
""",
"""\
schematic_api: marks tests covering \
API functionality (skipped on regular GitHub CI test suite)
""",
Expand All @@ -143,4 +146,4 @@ markers = [
rule_benchmark: marks tests covering \
validation rule benchmarking
"""
]
]
53 changes: 25 additions & 28 deletions schematic/store/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
import secrets
from dataclasses import dataclass
import tempfile
import shutil

# allows specifying explicit variable types
from typing import Dict, List, Tuple, Sequence, Union
Expand Down Expand Up @@ -43,17 +43,17 @@
from schematic_db.rdb.synapse_database import SynapseDatabase


from schematic.utils.df_utils import update_df, load_df, col_in_dataframe, populate_df_col_with_another_col
from schematic.utils.df_utils import update_df, load_df, col_in_dataframe
from schematic.utils.validate_utils import comma_separated_list_regex, rule_in_rule_list
from schematic.utils.general import entity_type_mapping, get_dir_size, convert_size, convert_gb_to_bytes, create_temp_folder
from schematic.utils.general import entity_type_mapping, get_dir_size, convert_gb_to_bytes, create_temp_folder, check_synapse_cache_size, clear_synapse_cache
from schematic.schemas.explorer import SchemaExplorer
from schematic.schemas.generator import SchemaGenerator
from schematic.store.base import BaseStorage
from schematic.exceptions import MissingConfigValueError, AccessCredentialsError

from schematic.configuration.configuration import CONFIG

from schematic.utils.general import profile
from schematic.utils.general import profile, calculate_datetime

logger = logging.getLogger("Synapse storage")

Expand All @@ -75,12 +75,16 @@ def _download_manifest_to_folder(self) -> File:
"""
if "SECRETS_MANAGER_SECRETS" in os.environ:
temporary_manifest_storage = "/var/tmp/temp_manifest_download"
# clear out all the existing manifests
if os.path.exists(temporary_manifest_storage):
shutil.rmtree(temporary_manifest_storage)
# create a new directory to store manifest
if not os.path.exists(temporary_manifest_storage):
os.mkdir("/var/tmp/temp_manifest_download")
os.mkdir(temporary_manifest_storage)
# create temporary folders for storing manifests
download_location = create_temp_folder(temporary_manifest_storage)
else:
download_location=CONFIG.manifest_folder

manifest_data = self.syn.get(
self.manifest_id,
downloadLocation=download_location,
Expand Down Expand Up @@ -177,41 +181,34 @@ def __init__(
Typical usage example:
syn_store = SynapseStorage()
"""

# TODO: turn root_synapse_cache to a parameter in init
self.syn = self.login(token, access_token)
self.project_scope = project_scope
self.storageFileview = CONFIG.synapse_master_fileview_id
self.manifest = CONFIG.synapse_manifest_basename
self.root_synapse_cache = "/root/.synapseCache"
self._query_fileview()

def _purge_synapse_cache(self, root_dir: str = "/var/www/.synapseCache/", maximum_storage_allowed_cache_gb=7):
def _purge_synapse_cache(self, maximum_storage_allowed_cache_gb=1):
"""
Purge synapse cache if it exceeds 7GB
Purge synapse cache if it exceeds a certain size. Default to 1GB.
Args:
root_dir: directory of the .synapseCache function
maximum_storage_allowed_cache_gb: the maximum storage allowed before purging cache. Default is 7 GB.
Returns:
if size of cache reaches a certain threshold (default is 7GB), return the number of files that get deleted
otherwise, return the total remaining space (assuming total ephemeral storage is 20GB on AWS )
maximum_storage_allowed_cache_gb: the maximum storage allowed before purging cache. Default is 1 GB.
"""
# try clearing the cache
# scan a directory and check size of files
cache = self.syn.cache
if os.path.exists(root_dir):
if os.path.exists(self.root_synapse_cache):
maximum_storage_allowed_cache_bytes = convert_gb_to_bytes(maximum_storage_allowed_cache_gb)
total_ephemeral_storag_gb = 20
total_ephemeral_storage_bytes = convert_gb_to_bytes(total_ephemeral_storag_gb)
nbytes = get_dir_size(root_dir)
# if 7 GB has already been taken, purge cache before 15 min
if nbytes >= maximum_storage_allowed_cache_bytes:
minutes_earlier = datetime.strftime(datetime.utcnow()- timedelta(minutes = 15), '%s')
num_of_deleted_files = cache.purge(before_date = int(minutes_earlier))
logger.info(f'{num_of_deleted_files} number of files have been deleted from {root_dir}')
nbytes = get_dir_size(self.root_synapse_cache)
dir_size_bytes = check_synapse_cache_size(directory=self.root_synapse_cache)
# if 1 GB has already been taken, purge cache before 15 min
if dir_size_bytes >= maximum_storage_allowed_cache_bytes:
num_of_deleted_files = clear_synapse_cache(self.syn.cache, minutes=15)
logger.info(f'{num_of_deleted_files} files have been deleted from {self.root_synapse_cache}')
else:
remaining_space = total_ephemeral_storage_bytes - nbytes
converted_space = convert_size(remaining_space)
logger.info(f'Estimated {remaining_space} bytes (which is approximately {converted_space}) remained in ephemeral storage after calculating size of .synapseCache excluding OS')
# on AWS, OS takes around 14-17% of our ephemeral storage (20GiB)
# instead of guessing how much space that we left, print out .synapseCache here
logger.info(f'the total size of .synapseCache is: {nbytes} bytes')

def _query_fileview(self):
self._purge_synapse_cache()
Expand Down
89 changes: 69 additions & 20 deletions schematic/utils/general.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
# allows specifying explicit variable types
from typing import Any, Dict, Optional, Text
import os
import math
import logging
import math
import os
import pstats
import subprocess
import tempfile
from cProfile import Profile
from datetime import datetime, timedelta
from functools import wraps

import tempfile
from typing import Union

from synapseclient.core.exceptions import SynapseHTTPError
from synapseclient.table import EntityViewSchema
from synapseclient.entity import File, Folder, Project
from synapseclient.table import EntityViewSchema

import synapseclient.core.cache as cache

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -57,24 +60,69 @@ def get_dir_size(path: str):
total += get_dir_size(entry.path)
return total

def calculate_datetime(minutes: int, input_date: datetime, before_or_after: str = "before") -> datetime:
"""calculate date time
Args:
input_date (datetime): date time object provided by users
minutes (int): number of minutes
before_or_after (str): default to "before". if "before", calculate x minutes before current date time. if "after", calculate x minutes after current date time.
Returns:
datetime: return result of date time calculation
"""
if before_or_after=="before":
date_time_result = input_date - timedelta(minutes=minutes)
elif before_or_after=="after":
date_time_result = input_date + timedelta(minutes=minutes)
else:
raise ValueError("Invalid value. Use either 'before' or 'after'.")
return date_time_result


def check_synapse_cache_size(directory='/root/.synapseCache')-> Union[float, int]:
"""use du --sh command to calculate size of .synapseCache.
def convert_size(size_bytes: int):
"""convert bytes to a human readable format
Args:
size_bytes: total byte sizes
return: a string that indicates bytes in a different format
directory (str, optional): .synapseCache directory. Defaults to '/root/.synapseCache'
Returns:
float or integer: returns size of .synapsecache directory in bytes
"""
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
# calculate the log of size (in bytes) to base 1024 and run it down to the nearest integer
index_int = int(math.floor(math.log(size_bytes, 1024)))
# return the value of 1024 raised to the power of index
power_cal = math.pow(1024, index_int)
# convert bytes to a different unit if applicable
size_bytes_converted = round(size_bytes / power_cal, 2)
return f"{size_bytes_converted} {size_name[index_int]})"
# Note: this command might fail on windows user. But since this command is primarily for running on AWS, it is fine.
command = ['du', '-sh', directory]
output = subprocess.run(command, capture_output=True).stdout.decode('utf-8')

# Parsing the output to extract the directory size
size = output.split('\t')[0]
if "K" in size:
size_in_kb = float(size.rstrip('K'))
byte_size = size_in_kb * 1000
elif "M" in size:
size_in_mb = float(size.rstrip('M'))
byte_size = size_in_mb * 1000000
elif "G" in size:
size_in_gb = float(size.rstrip('G'))
byte_size = convert_gb_to_bytes(size_in_gb)
elif "B" in size:
byte_size = float(size.rstrip('B'))
else:
logger.error('Cannot recongize the file size unit')
return byte_size

def clear_synapse_cache(cache: cache.Cache, minutes: int) -> int:
"""clear synapse cache before a certain time
Args:
cache: an object of synapseclient Cache.
minutes (int): all files before this minute will be removed
Returns:
int: number of files that get deleted
"""
current_date = datetime.utcnow()
minutes_earlier = calculate_datetime(input_date=current_date, minutes=minutes, before_or_after="before")
num_of_deleted_files = cache.purge(before_date = minutes_earlier)
return num_of_deleted_files

def convert_gb_to_bytes(gb: int):
"""convert gb to bytes
Expand All @@ -84,6 +132,7 @@ def convert_gb_to_bytes(gb: int):
"""
return gb * 1024 * 1024 * 1024


def entity_type_mapping(syn, entity_id):
"""
Return the entity type of manifest
Expand Down
100 changes: 82 additions & 18 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,36 @@
import logging
import json
import logging
import os
import shutil
import tempfile
import time
from datetime import datetime
from unittest import mock

import pandas as pd
import numpy as np
import pandas as pd
import pytest

import tempfile

import synapseclient
import synapseclient.core.cache as cache
from pandas.testing import assert_frame_equal
from synapseclient.core.exceptions import SynapseHTTPError

from schematic.schemas.explorer import SchemaExplorer
from schematic.schemas import df_parser
from schematic.utils import general
from schematic.utils import cli_utils
from schematic.utils import io_utils
from schematic.utils import df_utils
from schematic.utils import validate_utils
from schematic.exceptions import (
MissingConfigValueError,
MissingConfigAndArgumentValueError,
)
from schematic import LOADER
from schematic.exceptions import (MissingConfigAndArgumentValueError,
MissingConfigValueError)
from schematic.schemas import df_parser
from schematic.schemas.explorer import SchemaExplorer
from schematic.store.synapse import SynapseStorage
from schematic.utils.general import entity_type_mapping
from schematic.utils import (cli_utils, df_utils, general, io_utils,
validate_utils)
from schematic.utils.general import (calculate_datetime,
check_synapse_cache_size,
clear_synapse_cache, entity_type_mapping)

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

IN_GITHUB_ACTIONS = os.getenv("GITHUB_ACTIONS")

@pytest.fixture
def synapse_store():
Expand All @@ -39,8 +41,69 @@ def synapse_store():
synapse_store = SynapseStorage()
yield synapse_store


class TestGeneral:
def test_clear_synapse_cache(self, tmp_path):
# define location of mock synapse cache
mock_synapse_cache_dir = tmp_path / ".synapseCache/"
mock_synapse_cache_dir.mkdir()
mock_sub_folder = mock_synapse_cache_dir / "123"
mock_sub_folder.mkdir()
mock_table_query_folder = mock_sub_folder/ "456"
mock_table_query_folder.mkdir()

# create mock table query csv and a mock cache map
mock_synapse_table_query_csv = mock_table_query_folder/ "mock_synapse_table_query.csv"
mock_synapse_table_query_csv.write_text("mock table query content")
mock_cache_map = mock_table_query_folder/ ".cacheMap"
mock_cache_map.write_text(f"{mock_synapse_table_query_csv}: '2022-06-13T19:24:27.000Z'")

assert os.path.exists(mock_synapse_table_query_csv)

# since synapse python client would compare last modified date and before date
# we have to create a little time gap here
time.sleep(1)

# clear cache
my_cache = cache.Cache(cache_root_dir=mock_synapse_cache_dir)
clear_synapse_cache(my_cache, minutes=0.0001)
# make sure that cache files are now gone
assert os.path.exists(mock_synapse_table_query_csv) == False
assert os.path.exists(mock_cache_map) == False

def test_calculate_datetime_before_minutes(self):
input_date = datetime.strptime("07/20/23 17:36:34", '%m/%d/%y %H:%M:%S')
minutes_before = calculate_datetime(input_date=input_date, minutes=10, before_or_after="before")
expected_result_date_before = datetime.strptime("07/20/23 17:26:34", '%m/%d/%y %H:%M:%S')
assert minutes_before == expected_result_date_before

def test_calculate_datetime_after_minutes(self):
input_date = datetime.strptime("07/20/23 17:36:34", '%m/%d/%y %H:%M:%S')
minutes_after = calculate_datetime(input_date=input_date, minutes=10, before_or_after="after")
expected_result_date_after = datetime.strptime("07/20/23 17:46:34", '%m/%d/%y %H:%M:%S')
assert minutes_after == expected_result_date_after

def test_calculate_datetime_raise_error(self):
with pytest.raises(ValueError):
input_date = datetime.strptime("07/20/23 17:36:34", '%m/%d/%y %H:%M:%S')
minutes = calculate_datetime(input_date=input_date, minutes=10, before_or_after="error")

# this test might fail for windows machine
@pytest.mark.not_windows
def test_check_synapse_cache_size(self,tmp_path):
mock_synapse_cache_dir = tmp_path / ".synapseCache"
mock_synapse_cache_dir.mkdir()

mock_synapse_table_query_csv = mock_synapse_cache_dir/ "mock_synapse_table_query.csv"
mock_synapse_table_query_csv.write_text("example file for calculating cache")

file_size = check_synapse_cache_size(mock_synapse_cache_dir)

# For some reasons, when running in github action, the size of file changes.
if IN_GITHUB_ACTIONS:
assert file_size == 8000
else:
assert file_size == 4000

def test_find_duplicates(self):

mock_list = ["foo", "bar", "foo"]
Expand Down Expand Up @@ -84,6 +147,7 @@ def test_download_manifest_to_temp_folder(self):
path_dir = general.create_temp_folder(tmpdir)
assert os.path.exists(path_dir)


class TestCliUtils:
def test_query_dict(self):

Expand Down

0 comments on commit 388584f

Please sign in to comment.