From d95b2a01f4e86df7e110c60a24610bd71a1e2a86 Mon Sep 17 00:00:00 2001
From: $aTyam <satyam.saini@rutgers.edu>
Date: Sat, 3 Feb 2024 23:44:15 -0500
Subject: [PATCH] Improving Trip Segmentation by reducing DB calls

The changes  below  that led to these performance upgrades are investigated in   https://github.com/e-mission/e-mission-docs/issues/1041 . They are :

1.  db calls for transition and motion dataframes are moved upstream from  `is_tracking_restarted_in_range` function  and `get_ongoing_motion_in_range` in
 `restart_checking.py` to `trip_segmentaiton.py`.  The old setting which had multiple db calls ( for each iteration ) now happen once in the improved setting.

2. All the other changes in `trip_segmentation.py` and `dwell_segmentation_dist_filter.py` are just to support the change  in point 1 ( above).

3. in `dwell_segmentation_time_filter.py`,other than the changes to support point 1 ( above), there an additional improvement.  The calculations for `last10PointsDistances` and `last5MinsPoints` are vectorised.  For this,  `calDistance` in `common.py` now supports numpy arrays.
---
 .../intake/segmentation/restart_checking.py   | 37 ++++++++++---------
 .../intake/segmentation/trip_segmentation.py  | 29 +++++++++------
 .../dwell_segmentation_dist_filter.py         | 12 +++---
 .../dwell_segmentation_time_filter.py         | 33 +++++++++--------
 .../trip_end_detection_corner_cases.py        |  4 +-
 emission/core/common.py                       | 16 +++++++-
 .../intakeTests/TestTripSegmentation.py       |  8 +++-
 7 files changed, 85 insertions(+), 54 deletions(-)

diff --git a/emission/analysis/intake/segmentation/restart_checking.py b/emission/analysis/intake/segmentation/restart_checking.py
index 8ecc25e98..2a0b484d8 100644
--- a/emission/analysis/intake/segmentation/restart_checking.py
+++ b/emission/analysis/intake/segmentation/restart_checking.py
@@ -9,7 +9,7 @@
 import emission.core.wrapper.transition as ecwt
 import emission.storage.timeseries.timequery as estt
 
-def is_tracking_restarted_in_range(start_ts, end_ts, timeseries):
+def is_tracking_restarted_in_range(start_ts, end_ts, transition_df):
     """
     Check to see if tracing was restarted between the times specified
     :param start_ts: the start of the time range to check
@@ -17,28 +17,31 @@ def is_tracking_restarted_in_range(start_ts, end_ts, timeseries):
     :param timeseries: the timeseries to use for checking
     :return:
     """
-    import emission.storage.timeseries.timequery as estt
+    transition_df_start_idx=transition_df.ts.searchsorted(start_ts,side='left')    
+    transition_df_end_idx=transition_df.ts.searchsorted(end_ts,side='right')
+    transition_df_for_current=transition_df.iloc[transition_df_start_idx:transition_df_end_idx]
 
-    tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts,
-                        endTs=end_ts)
-    transition_df = timeseries.get_data_df("statemachine/transition", tq)
-    if len(transition_df) == 0:
+    if len(transition_df_for_current) == 0:
         logging.debug("In range %s -> %s found no transitions" %
-                      (tq.startTs, tq.endTs))
+                      (start_ts, end_ts))
         return False
     logging.debug("In range %s -> %s found transitions %s" %
-                  (tq.startTs, tq.endTs, transition_df[["fmt_time", "curr_state", "transition"]]))
-    return _is_tracking_restarted_android(transition_df) or \
-           _is_tracking_restarted_ios(transition_df)
+                  (start_ts, end_ts, transition_df_for_current[["fmt_time", "curr_state", "transition"]]))
+    return _is_tracking_restarted_android(transition_df_for_current) or \
+           _is_tracking_restarted_ios(transition_df_for_current)
 
-def get_ongoing_motion_in_range(start_ts, end_ts, timeseries):
-    tq = estt.TimeQuery(timeType = "data.ts", startTs = start_ts,
-                        endTs = end_ts)
-    motion_list = list(timeseries.find_entries(["background/motion_activity"], tq))
+def get_ongoing_motion_in_range(start_ts, end_ts, motion_df):
+    ## in case when we receive an empty dataframe, there's nothing to
+    ## process
+    if motion_df.shape == (0,0): 
+        return motion_df
+    
+    motion_df_start_idx=motion_df.ts.searchsorted(start_ts,side='left')    
+    motion_df_end_idx=motion_df.ts.searchsorted(end_ts,side='right')
+    filtered_motion_df=motion_df.iloc[motion_df_start_idx:motion_df_end_idx]
     logging.debug("Found %s motion_activity entries in range %s -> %s" %
-                  (len(motion_list), tq.startTs, tq.endTs))
-    logging.debug("sample activities are %s" % motion_list[0:5])
-    return motion_list
+                 (len(filtered_motion_df),start_ts, end_ts))
+    return filtered_motion_df
 
 def _is_tracking_restarted_android(transition_df):
     """
diff --git a/emission/analysis/intake/segmentation/trip_segmentation.py b/emission/analysis/intake/segmentation/trip_segmentation.py
index d6828af77..62856c3a5 100644
--- a/emission/analysis/intake/segmentation/trip_segmentation.py
+++ b/emission/analysis/intake/segmentation/trip_segmentation.py
@@ -7,6 +7,7 @@
 from builtins import *
 from builtins import object
 import logging
+import pandas as pd
 
 import emission.storage.timeseries.abstract_timeseries as esta
 import emission.storage.decorations.place_queries as esdp
@@ -65,6 +66,12 @@ def segment_current_trips(user_id):
     # We need to use the appropriate filter based on the incoming data
     # So let's read in the location points for the specified query
     loc_df = ts.get_data_df("background/filtered_location", time_query)
+    transition_df = ts.get_data_df("statemachine/transition", time_query)
+    motion_df = ts.get_data_df("background/motion_activity",time_query)
+    if len(transition_df) > 0:
+        logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]])
+    else:
+        logging.debug("no transitions found. This can happen for continuous sensing")
     if len(loc_df) == 0:
         # no new segments, no need to keep looking at these again
         logging.debug("len(loc_df) == 0, early return")
@@ -89,12 +96,12 @@ def segment_current_trips(user_id):
     if len(filters_in_df) == 1:
         # Common case - let's make it easy
         
-        segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts,
+        segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(transition_df,motion_df,ts,
             time_query)
     else:
         segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query,
                                                                filters_in_df,
-                                                               filter_methods)
+                                                               filter_methods,transition_df,motion_df)
     # Create and store trips and places based on the segmentation points
     if segmentation_points is None:
         epq.mark_segmentation_failed(user_id)
@@ -104,13 +111,13 @@ def segment_current_trips(user_id):
         epq.mark_segmentation_done(user_id, None)
     else:
         try:
-            create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]])
+            create_places_and_trips(user_id, transition_df, segmentation_points, filter_method_names[filters_in_df[0]])
             epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods))
         except:
             logging.exception("Trip generation failed for user %s" % user_id)
             epq.mark_segmentation_failed(user_id)
             
-def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods):
+def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods, transition_df, motion_df):
     """
     We can have mixed filters in a particular time range for multiple reasons.
     a) user switches phones from one platform to another
@@ -149,7 +156,7 @@ def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filt
             time_query.endTs = loc_df.iloc[endIndex+1].ts
         logging.debug("for filter %s, startTs = %d and endTs = %d" %
             (curr_filter, time_query.startTs, time_query.endTs))
-        segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(ts, time_query)
+        segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(transition_df,motion_df,ts, time_query)
     logging.debug("After filtering, segmentation_map has keys %s" % list(segmentation_map.keys()))
     sortedStartTsList = sorted(segmentation_map.keys())
     segmentation_points = []
@@ -171,7 +178,7 @@ def get_last_ts_processed(filter_methods):
     logging.info("Returning last_ts_processed = %s" % last_ts_processed)
     return last_ts_processed
 
-def create_places_and_trips(user_id, segmentation_points, segmentation_method_name):
+def create_places_and_trips(user_id, transition_df, segmentation_points, segmentation_method_name):
     # new segments, need to deal with them
     # First, retrieve the last place so that we can stitch it to the newly created trip.
     # Again, there are easy and hard. In the easy case, the trip was
@@ -214,7 +221,7 @@ def create_places_and_trips(user_id, segmentation_points, segmentation_method_na
         new_place_entry = ecwe.Entry.create_entry(user_id,
                             "segmentation/raw_place", new_place, create_id = True)
 
-        if found_untracked_period(ts, last_place_entry.data, start_loc, segmentation_method_name):
+        if found_untracked_period(transition_df, last_place_entry.data, start_loc, segmentation_method_name):
             # Fill in the gap in the chain with an untracked period
             curr_untracked = ecwut.Untrackedtime()
             curr_untracked.source = segmentation_method_name
@@ -254,7 +261,7 @@ def _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start
     # it will be lost
     ts.update(last_place_entry)
 
-def found_untracked_period(timeseries, last_place, start_loc, segmentation_method_name):
+def found_untracked_period(transition_df, last_place, start_loc, segmentation_method_name):
     """
     Check to see whether the two places are the same.
     This is a fix for https://github.com/e-mission/e-mission-server/issues/378
@@ -270,7 +277,7 @@ def found_untracked_period(timeseries, last_place, start_loc, segmentation_metho
         logging.debug("start of a chain, unable to check for restart from previous trip end, assuming not restarted")
         return False
 
-    if _is_tracking_restarted(last_place, start_loc, timeseries):
+    if _is_tracking_restarted(last_place, start_loc, transition_df):
         logging.debug("tracking has been restarted, returning True")
         return True
 
@@ -378,6 +385,6 @@ def stitch_together_end(new_place_entry, curr_trip_entry, end_loc):
     new_place_entry["data"] = new_place
     curr_trip_entry["data"] = curr_trip
 
-def _is_tracking_restarted(last_place, start_loc, timeseries):
-    return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, timeseries)
+def _is_tracking_restarted(last_place, start_loc, transition_df):
+    return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, transition_df)
 
diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py
index ea53c9abb..04f8f5c10 100644
--- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py
+++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py
@@ -38,7 +38,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold):
         self.point_threshold = point_threshold
         self.distance_threshold = distance_threshold
 
-    def segment_into_trips(self, timeseries, time_query):
+    def segment_into_trips(self, transition_df, motion_df, timeseries, time_query):
         """
         Examines the timeseries database for a specific range and returns the
         segmentation points. Note that the input is the entire timeseries and
@@ -48,7 +48,7 @@ def segment_into_trips(self, timeseries, time_query):
         """
         self.filtered_points_df = timeseries.get_data_df("background/filtered_location", time_query)
         self.filtered_points_df.loc[:,"valid"] = True
-        self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
+        self.transition_df = transition_df
         if len(self.transition_df) > 0:
             logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]])
         else:
@@ -88,7 +88,7 @@ def segment_into_trips(self, timeseries, time_query):
                 # So we reset_index upstream and use it here.
                 last10Points_df = self.filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1]
                 lastPoint = self.find_last_valid_point(idx)
-                if self.has_trip_ended(lastPoint, currPoint, timeseries):
+                if self.has_trip_ended(lastPoint, currPoint, timeseries, motion_df):
                     last_trip_end_point = lastPoint
                     logging.debug("Appending last_trip_end_point %s with index %s " %
                         (last_trip_end_point, idx-1))
@@ -144,7 +144,7 @@ def segment_into_trips(self, timeseries, time_query):
                 logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last))
         return segmentation_points
 
-    def has_trip_ended(self, lastPoint, currPoint, timeseries):
+    def has_trip_ended(self, lastPoint, currPoint, timeseries, motion_df):
         # So we must not have been moving for the last _time filter_
         # points. So the trip must have ended
         # Since this is a distance filter, we detect that the last
@@ -173,14 +173,14 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries):
             # for this kind of test
             speedThreshold = old_div(float(self.distance_threshold * 2), (old_div(self.time_threshold, 2)))
 
-            if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, timeseries):
+            if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, self.transition_df):
                 logging.debug("tracking was restarted, ending trip")
                 return True
 
             # In general, we get multiple locations between each motion activity. If we see a bunch of motion activities
             # between two location points, and there is a large gap between the last location and the first
             # motion activity as well, let us just assume that there was a restart
-            ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, timeseries)
+            ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, motion_df)
             ongoing_motion_check = len(ongoing_motion_in_range) > 0
             if timeDelta > self.time_threshold and not ongoing_motion_check:
                 logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" %
diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py
index d75760cd9..f2832d1df 100644
--- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py
+++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py
@@ -20,6 +20,7 @@
 import emission.core.wrapper.location as ecwl
 
 import emission.analysis.intake.segmentation.restart_checking as eaisr
+import emission.core.common as ec
 
 class DwellSegmentationTimeFilter(eaist.TripSegmentationMethod):
     def __init__(self, time_threshold, point_threshold, distance_threshold):
@@ -53,7 +54,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold):
         self.point_threshold = point_threshold
         self.distance_threshold = distance_threshold
 
-    def segment_into_trips(self, timeseries, time_query):
+    def segment_into_trips(self,transition_df,motion_df,timeseries, time_query):
         """
         Examines the timeseries database for a specific range and returns the
         segmentation points. Note that the input is the entire timeseries and
@@ -113,24 +114,26 @@ def segment_into_trips(self, timeseries, time_query):
             # We are going to use the last 8 points for now.
             # TODO: Change this back to last 10 points once we normalize phone and this
             last10Points_df = filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1]
-            distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint)
-            timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts
-            last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1)
-            logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances)))
-            last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1)
-            logging.debug("last10PointsDistances = %s with length %d, shape %s" % (last10PointsDistances.to_numpy(),
-                                                                           len(last10PointsDistances),
-                                                                           last10PointsDistances.shape))
-
+            last10Points_coords=last10Points_df[['longitude','latitude']].to_numpy()
+            # create a similar dimension current cordintaes numpy array
+            currPoint_coords = np.repeat(np.array([[currPoint.longitude,currPoint.latitude]]),len(last10Points_df),axis=0)
+            #compute distance
+            last10PointsDistances=ec.calDistance(last10Points_coords,currPoint_coords)
+            # Reset current coordintes numpy array as per last 5 mins  Points array's dimensions
+            currPoint_coords = np.repeat(np.array([[currPoint.longitude,currPoint.latitude]]),len(last5MinsPoints_df),axis=0)
+            # get 2d numpy array, from df
+            last5MinsPoints_coords=last5MinsPoints_df[['longitude','latitude']].to_numpy()
+            # calcualte distance
+            last5MinsDistances=ec.calDistance(last5MinsPoints_coords,currPoint_coords) 
             # Fix for https://github.com/e-mission/e-mission-server/issues/348
-            last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1)
+            last5MinTimes = currPoint.ts-last5MinsPoints_df.ts
             
             logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" %
                   (len(last10PointsDistances), len(last5MinsDistances)))
             logging.debug("last5MinsTimes.max() = %s, time_threshold = %s" %
                           (last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold))
 
-            if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes):
+            if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes, transition_df, motion_df):
                 (ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df,
                                                                                        last10Points_df, last5MinsPoints_df)
                 segmentation_points.append((curr_trip_start_point, last_trip_end_point))
@@ -199,7 +202,7 @@ def continue_just_ended(self, idx, currPoint, filtered_points_df):
             else:
                 return False
 
-    def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes):
+    def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes, transition_df, motion_df):
         # Another mismatch between phone and server. Phone stops tracking too soon,
         # so the distance is still greater than the threshold at the end of the trip.
         # But then the next point is a long time away, so we can split again (similar to a distance filter)
@@ -214,11 +217,11 @@ def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistanc
                 speedDelta = np.nan
             speedThreshold = old_div(float(self.distance_threshold), self.time_threshold)
 
-            if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, timeseries):
+            if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, transition_df):
                 logging.debug("tracking was restarted, ending trip")
                 return True
 
-            ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, timeseries)) > 0
+            ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, motion_df)) > 0
             if timeDelta > 2 * self.time_threshold and not ongoing_motion_check:
                 logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" %
                               (prev_point.ts, curr_point.ts,self.time_threshold, curr_point.ts - prev_point.ts, ongoing_motion_check))
diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py
index c329fe69d..76f13e2c9 100644
--- a/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py
+++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py
@@ -23,8 +23,8 @@ def is_huge_invalid_ts_offset(filterMethod, lastPoint, currPoint, timeseries,
                         ecwm.MotionTypes.NONE.value,
                         ecwm.MotionTypes.STOPPED_WHILE_IN_VEHICLE.value]
 
-    non_still_motions = [ma for ma in motionInRange if ma["data"]["type"] not in ignore_modes_list and ma["data"]["confidence"] == 100] 
-    logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions])
+    non_still_motions=motionInRange[~motionInRange['type'].isin(ignore_modes_list) & (motionInRange['confidence'] ==100)]
+    #logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions])
 
     non_still_motions_rate = len(non_still_motions) / (currPoint.ts - lastPoint.ts)
 
diff --git a/emission/core/common.py b/emission/core/common.py
index 4d97ce681..a2594643c 100644
--- a/emission/core/common.py
+++ b/emission/core/common.py
@@ -10,6 +10,7 @@
 from random import randrange
 import logging
 import copy
+import numpy as np
 from datetime import datetime, timedelta
 from dateutil import parser
 from pytz import timezone
@@ -51,7 +52,20 @@ def calDistance(point1, point2, coordinates=False):
     # SHANKARI: Why do we have two calDistance() functions?
     # Need to combine into one
     # points are now in geojson format (lng,lat)
-    if coordinates:
+
+    #Added to Support vectorization when dealing with numpy array
+    if isinstance(point1,np.ndarray) and isinstance(point2,np.ndarray):
+        dLat = np.radians(point1[:,1]-point2[:,1])
+        dLon = np.radians(point1[:,0]-point2[:,0])
+        lat1 = np.radians(point1[:,1])
+        lat2 = np.radians(point2[:,1]) 
+
+        a = (np.sin(dLat/2) ** 2) + ((np.sin(dLon/2) ** 2) * np.cos(lat1) * np.cos(lat2))
+        c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
+        d = earthRadius * c
+
+        return d
+    elif coordinates:
         dLat = math.radians(point1.lat-point2.lat)
         dLon = math.radians(point1.lon-point2.lon)
         lat1 = math.radians(point1.lat)
diff --git a/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py b/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py
index 0cc469fea..575fac606 100644
--- a/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py
+++ b/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py
@@ -68,10 +68,12 @@ def testEmptyCall(self):
     def testSegmentationPointsDwellSegmentationTimeFilter(self):
         ts = esta.TimeSeries.get_time_series(self.androidUUID)
         tq = estt.TimeQuery("metadata.write_ts", 1440658800, 1440745200)
+        transition_df = ts.get_data_df("statemachine/transition", tq)
+        motion_df = ts.get_data_df("background/motion_activity",tq)
         dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold = 5 * 60, # 5 mins
                                                   point_threshold = 10,
                                                   distance_threshold = 100) # 100 m
-        segmentation_points = dstfsm.segment_into_trips(ts, tq)
+        segmentation_points = dstfsm.segment_into_trips(transition_df, motion_df, ts, tq)
         for (start, end) in segmentation_points:
             logging.debug("trip is from %s (%f) -> %s (%f)" % (start.fmt_time, start.ts, end.fmt_time, end.ts))
         self.assertIsNotNone(segmentation_points)
@@ -86,10 +88,12 @@ def testSegmentationPointsDwellSegmentationTimeFilter(self):
     def testSegmentationPointsDwellSegmentationDistFilter(self):
         ts = esta.TimeSeries.get_time_series(self.iosUUID)
         tq = estt.TimeQuery("metadata.write_ts", 1446796800, 1446847600)
+        transition_df = ts.get_data_df("statemachine/transition", tq)
+        motion_df = ts.get_data_df("background/motion_activity",tq)
         dstdsm = dsdf.DwellSegmentationDistFilter(time_threshold = 10 * 60, # 5 mins
                                                   point_threshold = 10,
                                                   distance_threshold = 100) # 100 m
-        segmentation_points = dstdsm.segment_into_trips(ts, tq)
+        segmentation_points = dstdsm.segment_into_trips(transition_df, motion_df, ts, tq)
         for (start, end) in segmentation_points:
             logging.debug("trip is from %s (%f) -> %s (%f)" % (start.fmt_time, start.ts, end.fmt_time, end.ts))
         self.assertIsNotNone(segmentation_points)