From ae6eae62da28b9535035f4d23ff22f92646bbdcf Mon Sep 17 00:00:00 2001 From: "Mahadik, Mukul Chandrakant" Date: Mon, 8 Jan 2024 11:50:17 -0700 Subject: [PATCH] Storing data as JSON + Restore code added Choosing JSON instead of CSV since: 1. CSV does not retain nested dict-like document data structure of MongoDB documents. 2. CSV stores redundant empty NaN columns as well. --- bin/purge_user_timeseries.py | 28 ++++++-- bin/restore_user_timeseries.py | 38 +++++++++++ .../tests/binTests/TestPurgeUserTimeseries.py | 64 ++++++++++++++----- 3 files changed, 106 insertions(+), 24 deletions(-) create mode 100644 bin/restore_user_timeseries.py diff --git a/bin/purge_user_timeseries.py b/bin/purge_user_timeseries.py index e88968ae2..89f59de5a 100644 --- a/bin/purge_user_timeseries.py +++ b/bin/purge_user_timeseries.py @@ -8,16 +8,29 @@ import emission.core.wrapper.pipelinestate as ecwp import emission.storage.pipeline_queries as esp import pandas as pd - +import pymongo +from bson import ObjectId +import json +from uuid import UUID DEFAULT_DIR_NAME = "/tmp" DEFAULT_FILE_PREFIX = "old_timeseries_" def exportOldTimeseriesAsCsv(user_id, last_ts_run, dir_name, file_prefix): - filename = dir_name + "/" + file_prefix + str(user_id) + ".csv" + filename = dir_name + "/" + file_prefix + str(user_id) + ".json" all_data = list(edb.get_timeseries_db().find({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}})) - all_df = pd.json_normalize(all_data) - all_df.to_csv(filename) + # all_df = pd.json_normalize(all_data) + # print(all_df) + # all_df.to_csv(filename) + + def custom_encoder(obj): + if isinstance(obj, (UUID, ObjectId)): + return str(obj) + raise TypeError(f"Type {type(obj)} not serializable") + + with open(filename, 'w') as file: + json.dump(all_data, file, default=custom_encoder) + logging.info("Old timeseries data exported to {}".format(filename)) def purgeUserTimeseries(user_uuid, user_email=None, dir_name=DEFAULT_DIR_NAME, file_prefix=DEFAULT_FILE_PREFIX, unsafe_ignore_save=False): @@ -28,9 +41,10 @@ def purgeUserTimeseries(user_uuid, user_email=None, dir_name=DEFAULT_DIR_NAME, f cstate = esp.get_current_state(user_id, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS) last_ts_run = cstate['last_ts_run'] + print(f"last_ts_run : {last_ts_run}") if not last_ts_run: - logging.warning("No processed timeserie for user {}".format(user_id)) + logging.warning("No processed timeseries for user {}".format(user_id)) exit(1) if unsafe_ignore_save is True: @@ -38,8 +52,8 @@ def purgeUserTimeseries(user_uuid, user_email=None, dir_name=DEFAULT_DIR_NAME, f else: exportOldTimeseriesAsCsv(user_id, last_ts_run, dir_name, file_prefix) - res = edb.get_timeseries_db().delete_many({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}}) - logging.info("{} deleted entries since {}".format(res.deleted_count, datetime.fromtimestamp(last_ts_run))) + # res = edb.get_timeseries_db().delete_many({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}}) + # logging.info("{} deleted entries since {}".format(res.deleted_count, datetime.fromtimestamp(last_ts_run))) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) diff --git a/bin/restore_user_timeseries.py b/bin/restore_user_timeseries.py new file mode 100644 index 000000000..3602f91d1 --- /dev/null +++ b/bin/restore_user_timeseries.py @@ -0,0 +1,38 @@ +import logging +import argparse +import uuid +from datetime import datetime +import emission.core.wrapper.user as ecwu +import emission.core.get_database as edb +import emission.core.wrapper.pipelinestate as ecwp +import emission.core.wrapper.pipelinestate as ecwp +import emission.storage.pipeline_queries as esp +import pandas as pd +import pymongo +from bson import ObjectId +import json + +def restoreUserTimeseries(filename): + # df = pd.read_csv(filename) + # df['_id'] = df['_id'].apply(lambda x: ObjectId(x)) + # data = df.to_dict(orient='records') + # print(df) + # result = edb.get_timeseries_db().insert_many(data) + + with open(filename, 'r') as file: + data = json.load(file) + result = edb.get_timeseries_db().insert_many(data) + + logging.info("{} documents successfully inserted".format(len(result.inserted_ids))) + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + + parser = argparse.ArgumentParser(prog="restore_user_timeseries") + parser.add_argument( + "-f", "--file_name", + help="Path to the CSV file containing data to be imported" + ) + + args = parser.parse_args() + restoreUserTimeseries(args.file_name) \ No newline at end of file diff --git a/emission/tests/binTests/TestPurgeUserTimeseries.py b/emission/tests/binTests/TestPurgeUserTimeseries.py index bb393a0bb..6027ef6d2 100644 --- a/emission/tests/binTests/TestPurgeUserTimeseries.py +++ b/emission/tests/binTests/TestPurgeUserTimeseries.py @@ -17,26 +17,56 @@ def setUp(self): etc.runIntakePipeline(self.testUUID) def testPurgeUserTimeseries(self): - with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname: - cstate = esp.get_current_state(self.testUUID, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS) - last_ts_run = cstate['last_ts_run'] - self.assertTrue(last_ts_run > 0) + # with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname: + # cstate = esp.get_current_state(self.testUUID, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS) + # last_ts_run = cstate['last_ts_run'] + # self.assertTrue(last_ts_run > 0) - # Check how much data there was before - res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}}) - self.assertEqual(res, 1906) + # # Check how much data there was before + # res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}}) + # print(f"About to purge {res} entries") + # self.assertEqual(res, 1906) - # Run the purge function - file_prefix = "some_fancy_prefix_" - purgeUserTimeseries(str(self.testUUID), dir_name=tmpdirname, file_prefix=file_prefix) + # # Run the purge function + # file_prefix = "some_fancy_prefix_" + # purgeUserTimeseries(str(self.testUUID), dir_name=tmpdirname, file_prefix=file_prefix) - # Check how much data there is after - res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}}) - self.assertEqual(res, 0) + # # Check how much data there is after + # res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}}) + # print(f"Purging complete: {res} entries remaining") + # self.assertEqual(res, 0) - # Check that data was properly saved (1906 lines of data + 1 line of header) - with open(tmpdirname + "/" + file_prefix + str(self.testUUID) + ".csv", 'r') as f: - self.assertTrue(f.readlines(), 1907) + # # Check that data was properly saved (1906 lines of data + 1 line of header) + # with open(tmpdirname + "/" + file_prefix + str(self.testUUID) + ".csv", 'r') as f: + # csv_lines = f.readlines() + # print(f"No. of entries in CSV file: {len(csv_lines)}") + # self.assertEqual(len(csv_lines), 1907) + + + tmpdirname = "/Users/mmahadik/Documents/GitHub/logs/data/purge_csv" + cstate = esp.get_current_state(self.testUUID, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS) + last_ts_run = cstate['last_ts_run'] + self.assertTrue(last_ts_run > 0) + + # Check how much data there was before + res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}}) + print(f"About to purge {res} entries") + self.assertEqual(res, 1906) + + # Run the purge function + file_prefix = "some_fancy_prefix_" + purgeUserTimeseries(str(self.testUUID), dir_name=tmpdirname, file_prefix=file_prefix) + + # Check how much data there is after + res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}}) + print(f"Purging complete: {res} entries remaining") + self.assertEqual(res, 0) + + # Check that data was properly saved (1906 lines of data + 1 line of header) + with open(tmpdirname + "/" + file_prefix + str(self.testUUID) + ".csv", 'r') as f: + csv_lines = f.readlines() + print(f"No. of entries in CSV file: {len(csv_lines)}") + self.assertEqual(len(csv_lines), 1907) def tearDown(self): etc.dropAllCollections(edb._get_current_db()) @@ -44,4 +74,4 @@ def tearDown(self): if __name__ == '__main__': etc.configLogging() - unittest.main() + unittest.main() \ No newline at end of file