Skip to content

Commit

Permalink
Storing data as JSON + Restore code added
Browse files Browse the repository at this point in the history
Choosing JSON instead of CSV since:
1. CSV does not retain nested dict-like document data structure of MongoDB documents.
2. CSV stores redundant empty NaN columns as well.
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jan 8, 2024
1 parent 0d0a0ba commit ae6eae6
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 24 deletions.
28 changes: 21 additions & 7 deletions bin/purge_user_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,29 @@
import emission.core.wrapper.pipelinestate as ecwp
import emission.storage.pipeline_queries as esp
import pandas as pd

import pymongo
from bson import ObjectId
import json
from uuid import UUID

DEFAULT_DIR_NAME = "/tmp"
DEFAULT_FILE_PREFIX = "old_timeseries_"

def exportOldTimeseriesAsCsv(user_id, last_ts_run, dir_name, file_prefix):
filename = dir_name + "/" + file_prefix + str(user_id) + ".csv"
filename = dir_name + "/" + file_prefix + str(user_id) + ".json"
all_data = list(edb.get_timeseries_db().find({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}}))
all_df = pd.json_normalize(all_data)
all_df.to_csv(filename)
# all_df = pd.json_normalize(all_data)
# print(all_df)
# all_df.to_csv(filename)

def custom_encoder(obj):
if isinstance(obj, (UUID, ObjectId)):
return str(obj)
raise TypeError(f"Type {type(obj)} not serializable")

with open(filename, 'w') as file:
json.dump(all_data, file, default=custom_encoder)

logging.info("Old timeseries data exported to {}".format(filename))

def purgeUserTimeseries(user_uuid, user_email=None, dir_name=DEFAULT_DIR_NAME, file_prefix=DEFAULT_FILE_PREFIX, unsafe_ignore_save=False):
Expand All @@ -28,18 +41,19 @@ def purgeUserTimeseries(user_uuid, user_email=None, dir_name=DEFAULT_DIR_NAME, f

cstate = esp.get_current_state(user_id, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS)
last_ts_run = cstate['last_ts_run']
print(f"last_ts_run : {last_ts_run}")

if not last_ts_run:
logging.warning("No processed timeserie for user {}".format(user_id))
logging.warning("No processed timeseries for user {}".format(user_id))
exit(1)

if unsafe_ignore_save is True:
logging.warning("CSV export was ignored")
else:
exportOldTimeseriesAsCsv(user_id, last_ts_run, dir_name, file_prefix)

res = edb.get_timeseries_db().delete_many({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}})
logging.info("{} deleted entries since {}".format(res.deleted_count, datetime.fromtimestamp(last_ts_run)))
# res = edb.get_timeseries_db().delete_many({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}})
# logging.info("{} deleted entries since {}".format(res.deleted_count, datetime.fromtimestamp(last_ts_run)))

if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
Expand Down
38 changes: 38 additions & 0 deletions bin/restore_user_timeseries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging
import argparse
import uuid
from datetime import datetime
import emission.core.wrapper.user as ecwu
import emission.core.get_database as edb
import emission.core.wrapper.pipelinestate as ecwp
import emission.core.wrapper.pipelinestate as ecwp
import emission.storage.pipeline_queries as esp
import pandas as pd
import pymongo
from bson import ObjectId
import json

def restoreUserTimeseries(filename):
# df = pd.read_csv(filename)
# df['_id'] = df['_id'].apply(lambda x: ObjectId(x))
# data = df.to_dict(orient='records')
# print(df)
# result = edb.get_timeseries_db().insert_many(data)

with open(filename, 'r') as file:
data = json.load(file)
result = edb.get_timeseries_db().insert_many(data)

logging.info("{} documents successfully inserted".format(len(result.inserted_ids)))

if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)

parser = argparse.ArgumentParser(prog="restore_user_timeseries")
parser.add_argument(
"-f", "--file_name",
help="Path to the CSV file containing data to be imported"
)

args = parser.parse_args()
restoreUserTimeseries(args.file_name)
64 changes: 47 additions & 17 deletions emission/tests/binTests/TestPurgeUserTimeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,61 @@ def setUp(self):
etc.runIntakePipeline(self.testUUID)

def testPurgeUserTimeseries(self):
with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname:
cstate = esp.get_current_state(self.testUUID, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS)
last_ts_run = cstate['last_ts_run']
self.assertTrue(last_ts_run > 0)
# with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname:
# cstate = esp.get_current_state(self.testUUID, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS)
# last_ts_run = cstate['last_ts_run']
# self.assertTrue(last_ts_run > 0)

# Check how much data there was before
res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}})
self.assertEqual(res, 1906)
# # Check how much data there was before
# res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}})
# print(f"About to purge {res} entries")
# self.assertEqual(res, 1906)

# Run the purge function
file_prefix = "some_fancy_prefix_"
purgeUserTimeseries(str(self.testUUID), dir_name=tmpdirname, file_prefix=file_prefix)
# # Run the purge function
# file_prefix = "some_fancy_prefix_"
# purgeUserTimeseries(str(self.testUUID), dir_name=tmpdirname, file_prefix=file_prefix)

# Check how much data there is after
res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}})
self.assertEqual(res, 0)
# # Check how much data there is after
# res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}})
# print(f"Purging complete: {res} entries remaining")
# self.assertEqual(res, 0)

# Check that data was properly saved (1906 lines of data + 1 line of header)
with open(tmpdirname + "/" + file_prefix + str(self.testUUID) + ".csv", 'r') as f:
self.assertTrue(f.readlines(), 1907)
# # Check that data was properly saved (1906 lines of data + 1 line of header)
# with open(tmpdirname + "/" + file_prefix + str(self.testUUID) + ".csv", 'r') as f:
# csv_lines = f.readlines()
# print(f"No. of entries in CSV file: {len(csv_lines)}")
# self.assertEqual(len(csv_lines), 1907)


tmpdirname = "/Users/mmahadik/Documents/GitHub/logs/data/purge_csv"
cstate = esp.get_current_state(self.testUUID, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS)
last_ts_run = cstate['last_ts_run']
self.assertTrue(last_ts_run > 0)

# Check how much data there was before
res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}})
print(f"About to purge {res} entries")
self.assertEqual(res, 1906)

# Run the purge function
file_prefix = "some_fancy_prefix_"
purgeUserTimeseries(str(self.testUUID), dir_name=tmpdirname, file_prefix=file_prefix)

# Check how much data there is after
res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}})
print(f"Purging complete: {res} entries remaining")
self.assertEqual(res, 0)

# Check that data was properly saved (1906 lines of data + 1 line of header)
with open(tmpdirname + "/" + file_prefix + str(self.testUUID) + ".csv", 'r') as f:
csv_lines = f.readlines()
print(f"No. of entries in CSV file: {len(csv_lines)}")
self.assertEqual(len(csv_lines), 1907)

def tearDown(self):
etc.dropAllCollections(edb._get_current_db())


if __name__ == '__main__':
etc.configLogging()
unittest.main()
unittest.main()

0 comments on commit ae6eae6

Please sign in to comment.