Skip to content

Commit

Permalink
Merge pull request #562 from shankari/improve_multi_timeline_stuff
Browse files Browse the repository at this point in the history
Supporting storing and loading pipeline states as part of the multi-timeline
  • Loading branch information
shankari authored Jan 15, 2018
2 parents d34397e + c226560 commit 8a64611
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 22 deletions.
72 changes: 54 additions & 18 deletions bin/debug/extract_timeline_for_day_range_and_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import arrow
import argparse

import emission.core.wrapper.user as ecwu
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.timeseries.timequery as estt
import emission.storage.decorations.user_queries as esdu
Expand All @@ -41,8 +42,14 @@ def export_timeline(user_id, start_day_str, end_day_str, file_name):
trip_entry_list = list(ts.find_entries(key_list=None, time_query=trip_time_query))
place_time_query = estt.TimeQuery("data.enter_ts", start_day_ts, end_day_ts)
place_entry_list = list(ts.find_entries(key_list=None, time_query=place_time_query))
# Handle the case of the first place, which has no enter_ts and won't be
# matched by the default query
first_place_extra_query = {'$and': [{'data.enter_ts': {'$exists': False}},
{'data.exit_ts': {'$exists': True}}]}
first_place_entry_list = list(ts.find_entries(key_list=None, time_query=None, extra_query_list=[first_place_extra_query]))
logging.info("First place entry list = %s" % first_place_entry_list)

combined_list = loc_entry_list + trip_entry_list + place_entry_list
combined_list = loc_entry_list + trip_entry_list + place_entry_list + first_place_entry_list
logging.info("Found %d loc entries, %d trip-like entries, %d place-like entries = %d total entries" %
(len(loc_entry_list), len(trip_entry_list), len(place_entry_list), len(combined_list)))

Expand All @@ -53,9 +60,25 @@ def export_timeline(user_id, start_day_str, end_day_str, file_name):
if len(combined_list) == 0 or unique_key_list == set(['stats/pipeline_time']):
logging.info("No entries found in range for user %s, skipping save" % user_id)
else:
# Also dump the pipeline state, since that's where we have analysis results upto
# This allows us to copy data to a different *live system*, not just
# duplicate for analysis
combined_filename = "%s_%s.gz" % (file_name, user_id)
json.dump(combined_list,
gzip.open(combined_filename, "wt"), default=bju.default, allow_nan=False, indent=4)
with gzip.open(combined_filename, "wt") as gcfd:
json.dump(combined_list,
gcfd, default=bju.default, allow_nan=False, indent=4)

import emission.core.get_database as edb

pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id}))
logging.info("Found %d pipeline states %s" %
(len(pipeline_state_list),
list([ps["pipeline_stage"] for ps in pipeline_state_list])))

pipeline_filename = "%s_pipelinestate_%s.gz" % (file_name, user_id)
with gzip.open(pipeline_filename, "wt") as gpfd:
json.dump(pipeline_state_list,
gpfd, default=bju.default, allow_nan=False, indent=4)

def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list):
MAX_LIMIT = 25 * 10000
Expand All @@ -70,20 +93,33 @@ def export_timeline_for_users(user_id_list, args):
for curr_uuid in user_id_list:
if curr_uuid != '':
logging.info("=" * 50)
export_timeline(user_id=curr_uuid, start_day_str=sys.argv[2], end_day_str=sys.argv[3], file_name=sys.argv[4])

export_timeline(user_id=curr_uuid, start_day_str=args.start_day,
end_day_str= args.end_day, file_name=args.file_prefix)

if __name__ == '__main__':
if len(sys.argv) != 5:
print("Usage: %s [<user>|'all'|'file_XXX'] <start_day> <end_day> <file_prefix>" % (sys.argv[0]))
else:
user_id_str = sys.argv[1]
if user_id_str == "all":
all_uuids = esdu.get_all_uuids()
export_timeline_for_users(all_uuids, sys.argv)
elif user_id_str.startswith("file_"):
uuid_strs = json.load(open(user_id_str))
uuids = [uuid.UUID(ustr) for ustr in uuid_strs]
export_timeline_for_users(uuids, sys.argv)
else:
export_timeline(user_id=uuid.UUID(sys.argv[1]), start_day_str=sys.argv[2], end_day_str=sys.argv[3], file_name=sys.argv[4])
logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser(prog="extract_timeline_for_day_range_and_user")

group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-e", "--user_email", nargs="+")
group.add_argument("-u", "--user_uuid", nargs="+")
group.add_argument("-a", "--all", action="store_true")
group.add_argument("-f", "--file")

parser.add_argument("start_day", help="start day in utc - e.g. 'YYYY-MM-DD'" )
parser.add_argument("end_day", help="start day in utc - e.g. 'YYYY-MM-DD'" )
parser.add_argument("file_prefix", help="prefix for the filenames generated - e.g /tmp/dump_ will generate files /tmp/dump_<uuid1>.gz, /tmp/dump_<uuid2>.gz..." )

args = parser.parse_args()

if args.user_uuid:
uuid_list = [uuid.UUID(uuid_str) for uuid_str in args.user_uuid]
elif args.user_email:
uuid_list = [ecwu.User.fromEmail(uuid_str).uuid for uuid_str in args.user_email]
elif args.all:
uuid_list = esdu.get_all_uuids()
elif args.file:
with open(args.file) as fd:
uuid_strs = json.load(fd)
uuid_list = [uuid.UUID(ustr) for ustr in uuid_strs]
export_timeline_for_users(uuid_list, args)
22 changes: 18 additions & 4 deletions bin/debug/load_multi_timeline_for_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,25 @@ def get_load_ranges(entries):
ranges.append((start_indices[-1], len(entries)))
return ranges

def load_pipeline_states(file_prefix, all_uuid_list):
import emission.core.get_database as edb
for curr_uuid in all_uuid_list:
pipeline_filename = "%s_pipelinestate_%s.gz" % (file_prefix, curr_uuid)
print("Loading pipeline state for %s from %s" %
(curr_uuid, pipeline_filename))
with gzip.open(filename) as gfd:
states = json.load(gfd, object_hook = bju.object_hook)
edb.get_pipeline_state_db().insert_many(states)

def post_check(unique_user_list, all_rerun_list):
import emission.core.get_database as edb
import numpy as np

logging.info("For %s users, loaded %s raw entries and %s processed entries" %
logging.info("For %s users, loaded %s raw entries, %s processed entries and %s pipeline states" %
(len(unique_user_list),
edb.get_timeseries_db().find({"user_id": {"$in": list(unique_user_list)}}).count(),
edb.get_analysis_timeseries_db().find({"user_id": {"$in": list(unique_user_list)}}).count()))
edb.get_analysis_timeseries_db().find({"user_id": {"$in": list(unique_user_list)}}).count(),
edb.get_pipeline_state_db().find({"user_id": {"$in": list(unique_user_list)}}).count()))

all_rerun_arr = np.array(all_rerun_list)

Expand All @@ -68,7 +79,7 @@ def post_check(unique_user_list, all_rerun_list):

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("timeline_filename",
parser.add_argument("file_prefix",
help="the name of the file or file prefix that contains the json representation of the timeline")

parser.add_argument("-d", "--debug", type=int,
Expand All @@ -92,14 +103,16 @@ def post_check(unique_user_list, all_rerun_list):
else:
logging.basicConfig(level=logging.INFO)

fn = args.timeline_filename
fn = args.file_prefix
logging.info("Loading file or prefix %s" % fn)
sel_file_list = common.read_files_with_prefix(fn)

all_user_list = []
all_rerun_list = []

for i, filename in enumerate(sel_file_list):
if "pipelinestate" in filename:
continue
logging.info("=" * 50)
logging.info("Loading data from file %s" % filename)

Expand Down Expand Up @@ -128,6 +141,7 @@ def post_check(unique_user_list, all_rerun_list):

unique_user_list = set(all_user_list)
if not args.info_only:
load_pipeline_states(args.file_prefix, unique_user_list)
register_fake_users(args.prefix, unique_user_list)

post_check(unique_user_list, all_rerun_list)

0 comments on commit 8a64611

Please sign in to comment.