Skip to content

Commit

Permalink
refactor restart_checking.py to reduce number of queries
Browse files Browse the repository at this point in the history
restart_checking has 2 functions that are called repeatedly from both segmentation filter classes: `is_tracking_restarted_in_range` and `get_ongoing_motion_in_range`.
Both of these functions performed a DB query (for `statemachine/transition` and `background/motion_activity` entries, respectively)

In the case of the former, we already have all the `statemachine/transition` in the current processing time range kept in memory as a dataframe. This can be passed as an (optional) argument; we just have to then filter it down to the range of start_ts -> end_ts.

We do not have all the `background/motion_activity` kept as a dataframe but we can load them all at once and filter down later, just as we do with the`statemachine/transition`. This will use more memory but I think it is likely still more efficient than multiple queries because DB calls are a bottleneck on production.

I will investigate the effect of these changes further, but by my inital estimates, this drastically reduces the number of DB queries during trip segmentation (by a magnitude of ~100)
  • Loading branch information
JGreenlee committed Jan 30, 2025
1 parent d552b76 commit 740c75d
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 24 deletions.
36 changes: 24 additions & 12 deletions emission/analysis/intake/segmentation/restart_checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,46 @@
import emission.core.wrapper.transition as ecwt
import emission.storage.timeseries.timequery as estt

def is_tracking_restarted_in_range(start_ts, end_ts, timeseries):
def is_tracking_restarted_in_range(start_ts, end_ts, timeseries, transition_df=None):
"""
Check to see if tracing was restarted between the times specified
:param start_ts: the start of the time range to check
:param end_ts: the end of the time range to check
:param timeseries: the timeseries to use for checking
:param transition_df: dataframe of transitions to use (if None, will be fetched from timeseries)
:return:
"""
import emission.storage.timeseries.timequery as estt
if transition_df is not None:
transition_df = transition_df[
(transition_df['ts'] >= start_ts) & (transition_df['ts'] <= end_ts)
]
else:
import emission.storage.timeseries.timequery as estt
tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts,
endTs=end_ts)
transition_df = timeseries.get_data_df("statemachine/transition", tq)

tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts,
endTs=end_ts)
transition_df = timeseries.get_data_df("statemachine/transition", tq)
if len(transition_df) == 0:
logging.debug("In range %s -> %s found no transitions" %
(tq.startTs, tq.endTs))
(start_ts, end_ts))
return False
logging.debug("In range %s -> %s found transitions %s" %
(tq.startTs, tq.endTs, transition_df[["fmt_time", "curr_state", "transition"]]))
(start_ts, end_ts, transition_df[["fmt_time", "curr_state", "transition"]]))
return _is_tracking_restarted_android(transition_df) or \
_is_tracking_restarted_ios(transition_df)

def get_ongoing_motion_in_range(start_ts, end_ts, timeseries):
tq = estt.TimeQuery(timeType = "data.ts", startTs = start_ts,
endTs = end_ts)
motion_list = list(timeseries.find_entries(["background/motion_activity"], tq))
def get_ongoing_motion_in_range(start_ts, end_ts, timeseries, motion_list=None):
if motion_list is not None:
motion_list = [
m for m in motion_list if m['data']['ts'] >= start_ts and m['data']['ts'] <= end_ts
]
else:
tq = estt.TimeQuery(timeType = "data.ts", startTs = start_ts,
endTs = end_ts)
motion_list = list(timeseries.find_entries(["background/motion_activity"], tq))

logging.debug("Found %s motion_activity entries in range %s -> %s" %
(len(motion_list), tq.startTs, tq.endTs))
(len(motion_list), start_ts, end_ts))
logging.debug("sample activities are %s" % motion_list[0:5])
return motion_list

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def segment_into_trips(self, timeseries, time_query, filtered_points_df):
self.filtered_points_df.loc[:, "valid"] = True

self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
self.motion_list = list(timeseries.find_entries(["background/motion_activity"], time_query))

if len(self.transition_df) > 0:
logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]])
Expand Down Expand Up @@ -207,14 +208,14 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries):
# for this kind of test
speedThreshold = old_div(float(self.distance_threshold * 2), (old_div(self.time_threshold, 2)))

if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, timeseries):
if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, timeseries, self.transition_df):
logging.debug("tracking was restarted, ending trip")
return True

# In general, we get multiple locations between each motion activity. If we see a bunch of motion activities
# between two location points, and there is a large gap between the last location and the first
# motion activity as well, let us just assume that there was a restart
ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, timeseries)
ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, timeseries, self.motion_list)
ongoing_motion_check = len(ongoing_motion_in_range) > 0
if timeDelta > self.time_threshold and not ongoing_motion_check:
logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" %
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ def segment_into_trips(self, timeseries, time_query, filtered_points_df):
(filtered_points_df.metadata_write_ts - filtered_points_df.ts) < 1000
]
filtered_points_df.reset_index(inplace=True)
transition_df = timeseries.get_data_df("statemachine/transition", time_query)
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
self.motion_list = list(timeseries.find_entries(["background/motion_activity"], time_query))

if len(transition_df) > 0:
logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]])
if len(self.transition_df) > 0:
logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]])
else:
logging.debug("no transitions found. This can happen for continuous sensing")

Expand Down Expand Up @@ -185,11 +186,11 @@ def segment_into_trips(self, timeseries, time_query, filtered_points_df):
t_loop.elapsed
)

logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" %
(just_ended, len(transition_df)))
if not just_ended and len(transition_df) > 0:
stopped_moving_after_last = transition_df[
(transition_df.ts > currPoint.ts) & (transition_df.transition == 2)
logging.debug("Iterated over all points, just_ended = %s, len(self.transition_df) = %s" %
(just_ended, len(self.transition_df)))
if not just_ended and len(self.transition_df) > 0:
stopped_moving_after_last = self.transition_df[
(self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2)
]
logging.debug("looking after %s, found transitions %s" %
(currPoint.ts, stopped_moving_after_last))
Expand Down Expand Up @@ -252,11 +253,11 @@ def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistanc
speedDelta = np.nan
speedThreshold = old_div(float(self.distance_threshold), self.time_threshold)

if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, timeseries):
if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, timeseries, self.transition_df):
logging.debug("tracking was restarted, ending trip")
return True

ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, timeseries)) > 0
ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, timeseries, self.motion_list)) > 0
if timeDelta > 2 * self.time_threshold and not ongoing_motion_check:
logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" %
(prev_point.ts, curr_point.ts,self.time_threshold, curr_point.ts - prev_point.ts, ongoing_motion_check))
Expand Down

0 comments on commit 740c75d

Please sign in to comment.