From 740c75dcd2551683a632965dc36f51cb5768d341 Mon Sep 17 00:00:00 2001 From: Jack Greenlee Date: Thu, 30 Jan 2025 14:26:15 -0500 Subject: [PATCH] refactor restart_checking.py to reduce number of queries restart_checking has 2 functions that are called repeatedly from both segmentation filter classes: `is_tracking_restarted_in_range` and `get_ongoing_motion_in_range`. Both of these functions performed a DB query (for `statemachine/transition` and `background/motion_activity` entries, respectively) In the case of the former, we already have all the `statemachine/transition` in the current processing time range kept in memory as a dataframe. This can be passed as an (optional) argument; we just have to then filter it down to the range of start_ts -> end_ts. We do not have all the `background/motion_activity` kept as a dataframe but we can load them all at once and filter down later, just as we do with the`statemachine/transition`. This will use more memory but I think it is likely still more efficient than multiple queries because DB calls are a bottleneck on production. I will investigate the effect of these changes further, but by my inital estimates, this drastically reduces the number of DB queries during trip segmentation (by a magnitude of ~100) --- .../intake/segmentation/restart_checking.py | 36 ++++++++++++------- .../dwell_segmentation_dist_filter.py | 5 +-- .../dwell_segmentation_time_filter.py | 21 +++++------ 3 files changed, 38 insertions(+), 24 deletions(-) diff --git a/emission/analysis/intake/segmentation/restart_checking.py b/emission/analysis/intake/segmentation/restart_checking.py index 8ecc25e98..72fc54f5b 100644 --- a/emission/analysis/intake/segmentation/restart_checking.py +++ b/emission/analysis/intake/segmentation/restart_checking.py @@ -9,34 +9,46 @@ import emission.core.wrapper.transition as ecwt import emission.storage.timeseries.timequery as estt -def is_tracking_restarted_in_range(start_ts, end_ts, timeseries): +def is_tracking_restarted_in_range(start_ts, end_ts, timeseries, transition_df=None): """ Check to see if tracing was restarted between the times specified :param start_ts: the start of the time range to check :param end_ts: the end of the time range to check :param timeseries: the timeseries to use for checking + :param transition_df: dataframe of transitions to use (if None, will be fetched from timeseries) :return: """ - import emission.storage.timeseries.timequery as estt + if transition_df is not None: + transition_df = transition_df[ + (transition_df['ts'] >= start_ts) & (transition_df['ts'] <= end_ts) + ] + else: + import emission.storage.timeseries.timequery as estt + tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts, + endTs=end_ts) + transition_df = timeseries.get_data_df("statemachine/transition", tq) - tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts, - endTs=end_ts) - transition_df = timeseries.get_data_df("statemachine/transition", tq) if len(transition_df) == 0: logging.debug("In range %s -> %s found no transitions" % - (tq.startTs, tq.endTs)) + (start_ts, end_ts)) return False logging.debug("In range %s -> %s found transitions %s" % - (tq.startTs, tq.endTs, transition_df[["fmt_time", "curr_state", "transition"]])) + (start_ts, end_ts, transition_df[["fmt_time", "curr_state", "transition"]])) return _is_tracking_restarted_android(transition_df) or \ _is_tracking_restarted_ios(transition_df) -def get_ongoing_motion_in_range(start_ts, end_ts, timeseries): - tq = estt.TimeQuery(timeType = "data.ts", startTs = start_ts, - endTs = end_ts) - motion_list = list(timeseries.find_entries(["background/motion_activity"], tq)) +def get_ongoing_motion_in_range(start_ts, end_ts, timeseries, motion_list=None): + if motion_list is not None: + motion_list = [ + m for m in motion_list if m['data']['ts'] >= start_ts and m['data']['ts'] <= end_ts + ] + else: + tq = estt.TimeQuery(timeType = "data.ts", startTs = start_ts, + endTs = end_ts) + motion_list = list(timeseries.find_entries(["background/motion_activity"], tq)) + logging.debug("Found %s motion_activity entries in range %s -> %s" % - (len(motion_list), tq.startTs, tq.endTs)) + (len(motion_list), start_ts, end_ts)) logging.debug("sample activities are %s" % motion_list[0:5]) return motion_list diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py index 415e9087d..0060766d4 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py @@ -63,6 +63,7 @@ def segment_into_trips(self, timeseries, time_query, filtered_points_df): self.filtered_points_df.loc[:, "valid"] = True self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) + self.motion_list = list(timeseries.find_entries(["background/motion_activity"], time_query)) if len(self.transition_df) > 0: logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]]) @@ -207,14 +208,14 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries): # for this kind of test speedThreshold = old_div(float(self.distance_threshold * 2), (old_div(self.time_threshold, 2))) - if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, timeseries): + if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, timeseries, self.transition_df): logging.debug("tracking was restarted, ending trip") return True # In general, we get multiple locations between each motion activity. If we see a bunch of motion activities # between two location points, and there is a large gap between the last location and the first # motion activity as well, let us just assume that there was a restart - ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, timeseries) + ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, timeseries, self.motion_list) ongoing_motion_check = len(ongoing_motion_in_range) > 0 if timeDelta > self.time_threshold and not ongoing_motion_check: logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" % diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py index b302e50db..1565e9bf1 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py @@ -74,10 +74,11 @@ def segment_into_trips(self, timeseries, time_query, filtered_points_df): (filtered_points_df.metadata_write_ts - filtered_points_df.ts) < 1000 ] filtered_points_df.reset_index(inplace=True) - transition_df = timeseries.get_data_df("statemachine/transition", time_query) + self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) + self.motion_list = list(timeseries.find_entries(["background/motion_activity"], time_query)) - if len(transition_df) > 0: - logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]]) + if len(self.transition_df) > 0: + logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]]) else: logging.debug("no transitions found. This can happen for continuous sensing") @@ -185,11 +186,11 @@ def segment_into_trips(self, timeseries, time_query, filtered_points_df): t_loop.elapsed ) - logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" % - (just_ended, len(transition_df))) - if not just_ended and len(transition_df) > 0: - stopped_moving_after_last = transition_df[ - (transition_df.ts > currPoint.ts) & (transition_df.transition == 2) + logging.debug("Iterated over all points, just_ended = %s, len(self.transition_df) = %s" % + (just_ended, len(self.transition_df))) + if not just_ended and len(self.transition_df) > 0: + stopped_moving_after_last = self.transition_df[ + (self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2) ] logging.debug("looking after %s, found transitions %s" % (currPoint.ts, stopped_moving_after_last)) @@ -252,11 +253,11 @@ def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistanc speedDelta = np.nan speedThreshold = old_div(float(self.distance_threshold), self.time_threshold) - if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, timeseries): + if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, timeseries, self.transition_df): logging.debug("tracking was restarted, ending trip") return True - ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, timeseries)) > 0 + ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, timeseries, self.motion_list)) > 0 if timeDelta > 2 * self.time_threshold and not ongoing_motion_check: logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" % (prev_point.ts, curr_point.ts,self.time_threshold, curr_point.ts - prev_point.ts, ongoing_motion_check))