-
Notifications
You must be signed in to change notification settings - Fork 119
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge remote-tracking branch 'ttalex/purge-timeseries' into purge-res…
…tore-timeseries
- Loading branch information
Showing
2 changed files
with
115 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import logging | ||
import argparse | ||
import uuid | ||
from datetime import datetime | ||
import emission.core.wrapper.user as ecwu | ||
import emission.core.get_database as edb | ||
import emission.core.wrapper.pipelinestate as ecwp | ||
import emission.core.wrapper.pipelinestate as ecwp | ||
import emission.storage.pipeline_queries as esp | ||
import pandas as pd | ||
|
||
|
||
DEFAULT_DIR_NAME = "/tmp" | ||
DEFAULT_FILE_PREFIX = "old_timeseries_" | ||
|
||
def exportOldTimeseriesAsCsv(user_id, last_ts_run, dir_name, file_prefix): | ||
filename = dir_name + "/" + file_prefix + str(user_id) + ".csv" | ||
all_data = list(edb.get_timeseries_db().find({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}})) | ||
all_df = pd.json_normalize(all_data) | ||
all_df.to_csv(filename) | ||
logging.info("Old timeseries data exported to {}".format(filename)) | ||
|
||
def purgeUserTimeseries(user_uuid, user_email=None, dir_name=DEFAULT_DIR_NAME, file_prefix=DEFAULT_FILE_PREFIX, unsafe_ignore_save=False): | ||
if user_uuid: | ||
user_id = uuid.UUID(user_uuid) | ||
else: | ||
user_id = ecwu.User.fromEmail(user_email).uuid | ||
|
||
cstate = esp.get_current_state(user_id, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS) | ||
last_ts_run = cstate['last_ts_run'] | ||
|
||
if not last_ts_run: | ||
logging.warning("No processed timeserie for user {}".format(user_id)) | ||
exit(1) | ||
|
||
if unsafe_ignore_save is True: | ||
logging.warning("CSV export was ignored") | ||
else: | ||
exportOldTimeseriesAsCsv(user_id, last_ts_run, dir_name, file_prefix) | ||
|
||
res = edb.get_timeseries_db().delete_many({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}}) | ||
logging.info("{} deleted entries since {}".format(res.deleted_count, datetime.fromtimestamp(last_ts_run))) | ||
|
||
if __name__ == '__main__': | ||
logging.basicConfig(level=logging.DEBUG) | ||
|
||
parser = argparse.ArgumentParser(prog="purge_user_timeseries") | ||
group = parser.add_mutually_exclusive_group(required=True) | ||
group.add_argument("-e", "--user_email") | ||
group.add_argument("-u", "--user_uuid") | ||
parser.add_argument( | ||
"-d", "--dir_name", | ||
help="Target directory for exported csv data (defaults to {})".format(DEFAULT_DIR_NAME), | ||
default=DEFAULT_DIR_NAME | ||
) | ||
parser.add_argument( | ||
"--file_prefix", | ||
help="File prefix for exported csv data (defaults to {})".format(DEFAULT_FILE_PREFIX), | ||
default=DEFAULT_FILE_PREFIX | ||
) | ||
parser.add_argument( | ||
"--unsafe_ignore_save", | ||
help="Ignore csv export of deleted data (not recommended, this operation is definitive)", | ||
action='store_true' | ||
) | ||
|
||
args = parser.parse_args() | ||
purgeUserTimeseries(args.user_uuid, args.user_email, args.dir_name, args.file_prefix, args.unsafe_ignore_save) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
from future import standard_library | ||
standard_library.install_aliases() | ||
from builtins import * | ||
import os | ||
import tempfile | ||
import unittest | ||
import emission.tests.common as etc | ||
import emission.core.get_database as edb | ||
import emission.storage.pipeline_queries as esp | ||
import emission.core.wrapper.pipelinestate as ecwp | ||
from bin.purge_user_timeseries import purgeUserTimeseries | ||
|
||
|
||
class TestPurgeUserTimeseries(unittest.TestCase): | ||
def setUp(self): | ||
etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-07-22") | ||
etc.runIntakePipeline(self.testUUID) | ||
|
||
def testPurgeUserTimeseries(self): | ||
with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname: | ||
cstate = esp.get_current_state(self.testUUID, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS) | ||
last_ts_run = cstate['last_ts_run'] | ||
self.assertTrue(last_ts_run > 0) | ||
|
||
# Check how much data there was before | ||
res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}}) | ||
self.assertEqual(res, 1906) | ||
|
||
# Run the purge function | ||
file_prefix = "some_fancy_prefix_" | ||
purgeUserTimeseries(str(self.testUUID), dir_name=tmpdirname, file_prefix=file_prefix) | ||
|
||
# Check how much data there is after | ||
res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}}) | ||
self.assertEqual(res, 0) | ||
|
||
# Check that data was properly saved (1906 lines of data + 1 line of header) | ||
with open(tmpdirname + "/" + file_prefix + str(self.testUUID) + ".csv", 'r') as f: | ||
self.assertTrue(f.readlines(), 1907) | ||
|
||
def tearDown(self): | ||
etc.dropAllCollections(edb._get_current_db()) | ||
|
||
|
||
if __name__ == '__main__': | ||
etc.configLogging() | ||
unittest.main() |