Skip to content

Commit

Permalink
Reduce timing measurement to bottom 80% of tracked functions
Browse files Browse the repository at this point in the history
- Removed timing for less significant functions in TRIP_SEGMENTATION pipeline
- Focused retained measurements on key contributors
- Prepare for local testing to validate `create_time_filter` triggering
- Will explore top 20% timing optimizations in a follow-up commit
  • Loading branch information
TeachMeTW committed Nov 23, 2024
1 parent 3900d3c commit 0d2a2ed
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 112 deletions.
61 changes: 24 additions & 37 deletions emission/analysis/intake/segmentation/trip_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,62 +51,49 @@ def segment_into_trips(self, timeseries, time_query):
pass

def segment_current_trips(user_id):
with ect.Timer() as t_get_time_series:
ts = esta.TimeSeries.get_time_series(user_id)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_series", time.time(), t_get_time_series.elapsed)
ts = esta.TimeSeries.get_time_series(user_id)

with ect.Timer() as t_get_time_range:
time_query = epq.get_time_range_for_segmentation(user_id)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_range_for_segmentation", time.time(), t_get_time_range.elapsed)
time_query = epq.get_time_range_for_segmentation(user_id)

import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_time_filter as dstf
import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_dist_filter as dsdf

with ect.Timer() as t_create_time_filter:
dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins
point_threshold=9,
distance_threshold=100) # 100 m
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_time_filter", time.time(), t_create_time_filter.elapsed)
dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins
point_threshold=9,
distance_threshold=100) # 100 m

with ect.Timer() as t_create_dist_filter:
dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins
point_threshold=9,
distance_threshold=50) # 50 m
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_dist_filter", time.time(), t_create_dist_filter.elapsed)
dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins
point_threshold=9,
distance_threshold=50) # 50 m

filter_methods = {"time": dstfsm, "distance": dsdfsm}
filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"}

# We need to use the appropriate filter based on the incoming data
# So let's read in the location points for the specified query
with ect.Timer() as t_get_data_df:
loc_df = ts.get_data_df("background/filtered_location", time_query)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_data_df", time.time(), t_get_data_df.elapsed)
loc_df = ts.get_data_df("background/filtered_location", time_query)

if len(loc_df) == 0:
# no new segments, no need to keep looking at these again
logging.debug("len(loc_df) == 0, early return")
epq.mark_segmentation_done(user_id, None)
return

with ect.Timer() as t_handle_out_of_order:
out_of_order_points = loc_df[loc_df.ts.diff() < 0]
if len(out_of_order_points) > 0:
logging.info("Found out of order points!")
logging.info("%s" % out_of_order_points)
# drop from the table
loc_df = loc_df.drop(out_of_order_points.index.tolist())
loc_df.reset_index(inplace=True)
# invalidate in the database.
out_of_order_id_list = out_of_order_points["_id"].tolist()
logging.debug("out_of_order_id_list = %s" % out_of_order_id_list)
for ooid in out_of_order_id_list:
ts.invalidate_raw_entry(ooid)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/handle_out_of_order_points", time.time(), t_handle_out_of_order.elapsed)

with ect.Timer() as t_get_filters:
filters_in_df = loc_df["filter"].dropna().unique()
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_filters_in_df", time.time(), t_get_filters.elapsed)
out_of_order_points = loc_df[loc_df.ts.diff() < 0]
if len(out_of_order_points) > 0:
logging.info("Found out of order points!")
logging.info("%s" % out_of_order_points)
# drop from the table
loc_df = loc_df.drop(out_of_order_points.index.tolist())
loc_df.reset_index(inplace=True)
# invalidate in the database.
out_of_order_id_list = out_of_order_points["_id"].tolist()
logging.debug("out_of_order_id_list = %s" % out_of_order_id_list)
for ooid in out_of_order_id_list:
ts.invalidate_raw_entry(ooid)


filters_in_df = loc_df["filter"].dropna().unique()

logging.debug("Filters in the dataframe = %s" % filters_in_df)
if len(filters_in_df) == 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,9 @@ def segment_into_trips(self, timeseries, time_query):
t_get_filtered_points.elapsed
)

with ect.Timer() as t_mark_valid:
self.filtered_points_df.loc[:, "valid"] = True
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/mark_valid",
time.time(),
t_mark_valid.elapsed
)
self.filtered_points_df.loc[:, "valid"] = True

with ect.Timer() as t_get_transition_df:
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_transition_df",
time.time(),
t_get_transition_df.elapsed
)
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)

if len(self.transition_df) > 0:
logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]])
Expand Down Expand Up @@ -119,14 +105,7 @@ def segment_into_trips(self, timeseries, time_query):
# else:
sel_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
with ect.Timer() as t_set_start_point:
curr_trip_start_point = sel_point
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/set_new_trip_start_point",
time.time(),
t_set_start_point.elapsed
)
curr_trip_start_point = sel_point
just_ended = False
else:
with ect.Timer() as t_process_trip:
Expand Down Expand Up @@ -186,57 +165,44 @@ def segment_into_trips(self, timeseries, time_query):
t_loop.elapsed
)

with ect.Timer() as t_post_loop:
# Since we only end a trip when we start a new trip, this means that
# the last trip that was pushed is ignored. Consider the example of
# 2016-02-22 when I took kids to karate. We arrived shortly after 4pm,
# so during that remote push, a trip end was not detected. And we got
# back home shortly after 5pm, so the trip end was only detected on the
# phone at 6pm. At that time, the following points were pushed:
# ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50,
# ..., 2016-02-22T16:57:04
# Then, on the server, while iterating through the points, we detected
# a trip end at 16:04, and a new trip start at 16:49. But we did not
# detect the trip end at 16:57, because we didn't start a new point.
# This has two issues:
# - we won't see this trip until the next trip start, which may be on the next day
# - we won't see this trip at all, because when we run the pipeline the
# next time, we will only look at points from that time onwards. These
# points have been marked as "processed", so they won't even be considered.

# There are multiple potential fixes:
# - we can mark only the completed trips as processed. This will solve (2) above, but not (1)
# - we can mark a trip end based on the fact that we only push data
# when a trip ends, so if we have data, it means that the trip has been
# detected as ended on the phone.
# This seems a bit fragile - what if we start pushing incomplete trip
# data for efficiency reasons? Therefore, we also check to see if there
# is a trip_end_detected in this timeframe after the last point. If so,
# then we end the trip at the last point that we have.
if not just_ended and len(self.transition_df) > 0:
with ect.Timer() as t_check_transitions:
stopped_moving_after_last = self.transition_df[
(self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2)
]
logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]])
if len(stopped_moving_after_last) > 0:
logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last))
segmentation_points.append((curr_trip_start_point, currPoint))
self.last_ts_processed = currPoint.metadata_write_ts
else:
logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last))
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/check_transitions_post_loop",
time.time(),
t_check_transitions.elapsed
)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/post_loop",
time.time(),
t_post_loop.elapsed
)

# Since we only end a trip when we start a new trip, this means that
# the last trip that was pushed is ignored. Consider the example of
# 2016-02-22 when I took kids to karate. We arrived shortly after 4pm,
# so during that remote push, a trip end was not detected. And we got
# back home shortly after 5pm, so the trip end was only detected on the
# phone at 6pm. At that time, the following points were pushed:
# ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50,
# ..., 2016-02-22T16:57:04
# Then, on the server, while iterating through the points, we detected
# a trip end at 16:04, and a new trip start at 16:49. But we did not
# detect the trip end at 16:57, because we didn't start a new point.
# This has two issues:
# - we won't see this trip until the next trip start, which may be on the next day
# - we won't see this trip at all, because when we run the pipeline the
# next time, we will only look at points from that time onwards. These
# points have been marked as "processed", so they won't even be considered.

# There are multiple potential fixes:
# - we can mark only the completed trips as processed. This will solve (2) above, but not (1)
# - we can mark a trip end based on the fact that we only push data
# when a trip ends, so if we have data, it means that the trip has been
# detected as ended on the phone.
# This seems a bit fragile - what if we start pushing incomplete trip
# data for efficiency reasons? Therefore, we also check to see if there
# is a trip_end_detected in this timeframe after the last point. If so,
# then we end the trip at the last point that we have.
if not just_ended and len(self.transition_df) > 0:
stopped_moving_after_last = self.transition_df[
(self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2)
]
logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]])
if len(stopped_moving_after_last) > 0:
logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last))
segmentation_points.append((curr_trip_start_point, currPoint))
self.last_ts_processed = currPoint.metadata_write_ts
else:
logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last))

return segmentation_points

Expand Down

0 comments on commit 0d2a2ed

Please sign in to comment.