Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trimmed Pipeline Stat Tracking #994

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 19 additions & 30 deletions emission/analysis/intake/segmentation/trip_segmentation.py
Original file line number Diff line number Diff line change
@@ -51,28 +51,20 @@ def segment_into_trips(self, timeseries, time_query):
pass

def segment_current_trips(user_id):
with ect.Timer() as t_get_time_series:
ts = esta.TimeSeries.get_time_series(user_id)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_series", time.time(), t_get_time_series.elapsed)
ts = esta.TimeSeries.get_time_series(user_id)

with ect.Timer() as t_get_time_range:
time_query = epq.get_time_range_for_segmentation(user_id)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_range_for_segmentation", time.time(), t_get_time_range.elapsed)
time_query = epq.get_time_range_for_segmentation(user_id)

import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_time_filter as dstf
import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_dist_filter as dsdf

with ect.Timer() as t_create_time_filter:
dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins
dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins
point_threshold=9,
distance_threshold=100) # 100 m
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_time_filter", time.time(), t_create_time_filter.elapsed)

with ect.Timer() as t_create_dist_filter:
dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins
dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins
point_threshold=9,
distance_threshold=50) # 50 m
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_dist_filter", time.time(), t_create_dist_filter.elapsed)

filter_methods = {"time": dstfsm, "distance": dsdfsm}
filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"}
@@ -89,24 +81,21 @@ def segment_current_trips(user_id):
epq.mark_segmentation_done(user_id, None)
return

with ect.Timer() as t_handle_out_of_order:
out_of_order_points = loc_df[loc_df.ts.diff() < 0]
if len(out_of_order_points) > 0:
logging.info("Found out of order points!")
logging.info("%s" % out_of_order_points)
# drop from the table
loc_df = loc_df.drop(out_of_order_points.index.tolist())
loc_df.reset_index(inplace=True)
# invalidate in the database.
out_of_order_id_list = out_of_order_points["_id"].tolist()
logging.debug("out_of_order_id_list = %s" % out_of_order_id_list)
for ooid in out_of_order_id_list:
ts.invalidate_raw_entry(ooid)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/handle_out_of_order_points", time.time(), t_handle_out_of_order.elapsed)

with ect.Timer() as t_get_filters:
filters_in_df = loc_df["filter"].dropna().unique()
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_filters_in_df", time.time(), t_get_filters.elapsed)
out_of_order_points = loc_df[loc_df.ts.diff() < 0]
if len(out_of_order_points) > 0:
logging.info("Found out of order points!")
logging.info("%s" % out_of_order_points)
# drop from the table
loc_df = loc_df.drop(out_of_order_points.index.tolist())
loc_df.reset_index(inplace=True)
# invalidate in the database.
out_of_order_id_list = out_of_order_points["_id"].tolist()
logging.debug("out_of_order_id_list = %s" % out_of_order_id_list)
for ooid in out_of_order_id_list:
ts.invalidate_raw_entry(ooid)


filters_in_df = loc_df["filter"].dropna().unique()

logging.debug("Filters in the dataframe = %s" % filters_in_df)
if len(filters_in_df) == 1:
Original file line number Diff line number Diff line change
@@ -60,23 +60,10 @@ def segment_into_trips(self, timeseries, time_query):
t_get_filtered_points.elapsed
)

with ect.Timer() as t_mark_valid:
self.filtered_points_df.loc[:, "valid"] = True
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/mark_valid",
time.time(),
t_mark_valid.elapsed
)
self.filtered_points_df.loc[:, "valid"] = True

self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)

with ect.Timer() as t_get_transition_df:
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_transition_df",
time.time(),
t_get_transition_df.elapsed
)

if len(self.transition_df) > 0:
logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]])
@@ -103,14 +90,7 @@ def segment_into_trips(self, timeseries, time_query):
# segmentation_points.append(currPoint)

if just_ended:
with ect.Timer() as t_continue_just_ended:
continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/continue_just_ended",
time.time(),
t_continue_just_ended.elapsed
)
continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df)

if continue_flag:
# We have "processed" the currPoint by deciding to glom it
@@ -119,124 +99,83 @@ def segment_into_trips(self, timeseries, time_query):
# else:
sel_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
with ect.Timer() as t_set_start_point:
curr_trip_start_point = sel_point
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/set_new_trip_start_point",
time.time(),
t_set_start_point.elapsed
)
curr_trip_start_point = sel_point
just_ended = False
else:
with ect.Timer() as t_process_trip:
# Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive.
# Using .iloc just ends up including points after this one.
# So we reset_index upstream and use it here.
last10Points_df = self.filtered_points_df.iloc[
max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1
]
lastPoint = self.find_last_valid_point(idx)
with ect.Timer() as t_has_trip_ended:
trip_ended = self.has_trip_ended(lastPoint, currPoint, timeseries)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/has_trip_ended",
time.time(),
t_has_trip_ended.elapsed
)

if trip_ended:
with ect.Timer() as t_get_last_trip_end_point:
last_trip_end_point = lastPoint
logging.debug("Appending last_trip_end_point %s with index %s " %
(last_trip_end_point, idx - 1))
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_last_trip_end_point",
time.time(),
t_get_last_trip_end_point.elapsed
)

with ect.Timer() as t_handle_trip_end:
just_ended = True
# Now, we have finished processing the previous point as a trip
# end or not. But we still need to process this point by seeing
# whether it should represent a new trip start, or a glom to the
# previous trip
if not self.continue_just_ended(idx, currPoint, self.filtered_points_df):
sel_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
curr_trip_start_point = sel_point
just_ended = False
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/handle_trip_end",
time.time(),
t_handle_trip_end.elapsed
)
# Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive.
# Using .iloc just ends up including points after this one.
# So we reset_index upstream and use it here.
last10Points_df = self.filtered_points_df.iloc[
max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1
]
lastPoint = self.find_last_valid_point(idx)

trip_ended = self.has_trip_ended(lastPoint, currPoint, timeseries)

if trip_ended:
last_trip_end_point = lastPoint
logging.debug("Appending last_trip_end_point %s with index %s " %
(last_trip_end_point, idx - 1))
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts

just_ended = True
# Now, we have finished processing the previous point as a trip
# end or not. But we still need to process this point by seeing
# whether it should represent a new trip start, or a glom to the
# previous trip
if not self.continue_just_ended(idx, currPoint, self.filtered_points_df):
sel_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
curr_trip_start_point = sel_point
just_ended = False
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/loop",
time.time(),
t_loop.elapsed
)

with ect.Timer() as t_post_loop:
# Since we only end a trip when we start a new trip, this means that
# the last trip that was pushed is ignored. Consider the example of
# 2016-02-22 when I took kids to karate. We arrived shortly after 4pm,
# so during that remote push, a trip end was not detected. And we got
# back home shortly after 5pm, so the trip end was only detected on the
# phone at 6pm. At that time, the following points were pushed:
# ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50,
# ..., 2016-02-22T16:57:04
# Then, on the server, while iterating through the points, we detected
# a trip end at 16:04, and a new trip start at 16:49. But we did not
# detect the trip end at 16:57, because we didn't start a new point.
# This has two issues:
# - we won't see this trip until the next trip start, which may be on the next day
# - we won't see this trip at all, because when we run the pipeline the
# next time, we will only look at points from that time onwards. These
# points have been marked as "processed", so they won't even be considered.

# There are multiple potential fixes:
# - we can mark only the completed trips as processed. This will solve (2) above, but not (1)
# - we can mark a trip end based on the fact that we only push data
# when a trip ends, so if we have data, it means that the trip has been
# detected as ended on the phone.
# This seems a bit fragile - what if we start pushing incomplete trip
# data for efficiency reasons? Therefore, we also check to see if there
# is a trip_end_detected in this timeframe after the last point. If so,
# then we end the trip at the last point that we have.
if not just_ended and len(self.transition_df) > 0:
with ect.Timer() as t_check_transitions:
stopped_moving_after_last = self.transition_df[
(self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2)
]
logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]])
if len(stopped_moving_after_last) > 0:
logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last))
segmentation_points.append((curr_trip_start_point, currPoint))
self.last_ts_processed = currPoint.metadata_write_ts
else:
logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last))
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/check_transitions_post_loop",
time.time(),
t_check_transitions.elapsed
)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/post_loop",
time.time(),
t_post_loop.elapsed
)

# Since we only end a trip when we start a new trip, this means that
# the last trip that was pushed is ignored. Consider the example of
# 2016-02-22 when I took kids to karate. We arrived shortly after 4pm,
# so during that remote push, a trip end was not detected. And we got
# back home shortly after 5pm, so the trip end was only detected on the
# phone at 6pm. At that time, the following points were pushed:
# ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50,
# ..., 2016-02-22T16:57:04
# Then, on the server, while iterating through the points, we detected
# a trip end at 16:04, and a new trip start at 16:49. But we did not
# detect the trip end at 16:57, because we didn't start a new point.
# This has two issues:
# - we won't see this trip until the next trip start, which may be on the next day
# - we won't see this trip at all, because when we run the pipeline the
# next time, we will only look at points from that time onwards. These
# points have been marked as "processed", so they won't even be considered.

# There are multiple potential fixes:
# - we can mark only the completed trips as processed. This will solve (2) above, but not (1)
# - we can mark a trip end based on the fact that we only push data
# when a trip ends, so if we have data, it means that the trip has been
# detected as ended on the phone.
# This seems a bit fragile - what if we start pushing incomplete trip
# data for efficiency reasons? Therefore, we also check to see if there
# is a trip_end_detected in this timeframe after the last point. If so,
# then we end the trip at the last point that we have.
if not just_ended and len(self.transition_df) > 0:
stopped_moving_after_last = self.transition_df[
(self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2)
]
logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]])
if len(stopped_moving_after_last) > 0:
logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last))
segmentation_points.append((curr_trip_start_point, currPoint))
self.last_ts_processed = currPoint.metadata_write_ts
else:
logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last))

return segmentation_points

Original file line number Diff line number Diff line change
@@ -75,30 +75,16 @@ def segment_into_trips(self, timeseries, time_query):
t_get_filtered_points_pre.elapsed
)

with ect.Timer() as t_filter_bogus_points:
# Sometimes, we can get bogus points because data.ts and
# metadata.write_ts are off by a lot. If we don't do this, we end up
# appearing to travel back in time
# https://github.com/e-mission/e-mission-server/issues/457
filtered_points_df = filtered_points_pre_ts_diff_df[
(filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000
]
filtered_points_df.reset_index(inplace=True)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/filter_bogus_points",
time.time(),
t_filter_bogus_points.elapsed
)
# Sometimes, we can get bogus points because data.ts and
# metadata.write_ts are off by a lot. If we don't do this, we end up
# appearing to travel back in time
# https://github.com/e-mission/e-mission-server/issues/457
filtered_points_df = filtered_points_pre_ts_diff_df[
(filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000
]
filtered_points_df.reset_index(inplace=True)

with ect.Timer() as t_get_transition_df:
transition_df = timeseries.get_data_df("statemachine/transition", time_query)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_transition_df",
time.time(),
t_get_transition_df.elapsed
)
transition_df = timeseries.get_data_df("statemachine/transition", time_query)

if len(transition_df) > 0:
logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]])
@@ -135,113 +121,95 @@ def segment_into_trips(self, timeseries, time_query):
curr_trip_start_point = sel_point
just_ended = False

with ect.Timer() as t_calculations:
last5MinsPoints_df = filtered_points_df[np.logical_and(
np.logical_and(
filtered_points_df.ts > currPoint.ts - self.time_threshold,
filtered_points_df.ts < currPoint.ts
),
filtered_points_df.ts >= curr_trip_start_point.ts
)]
# Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive.
# Using .iloc just ends up including points after this one.
# So we reset_index upstream and use it here.
# We are going to use the last 8 points for now.
# TODO: Change this back to last 10 points once we normalize phone and this
last10Points_df = filtered_points_df.iloc[
max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1
]
distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint)
timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts
last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1)
logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances)))
last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1)
logging.debug("last10PointsDistances = %s with length %d, shape %s" % (
last10PointsDistances.to_numpy(),
len(last10PointsDistances),
last10PointsDistances.shape
))

# Fix for https://github.com/e-mission/e-mission-server/issues/348
last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1)

logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" %
(len(last10PointsDistances), len(last5MinsDistances)))
logging.debug("last5MinTimes.max() = %s, time_threshold = %s" %
(last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold))

esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculations_per_iteration",
time.time(),
t_calculations.elapsed
)

with ect.Timer() as t_has_trip_ended:
if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes):
(ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(
filtered_points_df,
last10Points_df,
last5MinsPoints_df
)
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts
if ended_before_this:
# in this case, we end a trip at the previous point, and the next trip starts at this
# point, not the next one
just_ended = False
prevPoint = currPoint
curr_trip_start_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" %
(currPoint, currPoint.idx))
else:
# We end a trip at the current point, and the next trip starts at the next point
just_ended = True
prevPoint = None
else:
prevPoint = currPoint
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/has_trip_ended",
time.time(),
t_has_trip_ended.elapsed
)
last5MinsPoints_df = filtered_points_df[np.logical_and(
np.logical_and(
filtered_points_df.ts > currPoint.ts - self.time_threshold,
filtered_points_df.ts < currPoint.ts
),
filtered_points_df.ts >= curr_trip_start_point.ts
)]
# Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive.
# Using .iloc just ends up including points after this one.
# So we reset_index upstream and use it here.
# We are going to use the last 8 points for now.
# TODO: Change this back to last 10 points once we normalize phone and this
last10Points_df = filtered_points_df.iloc[
max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1
]
distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint)
timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts
last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1)
logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances)))
last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1)
logging.debug("last10PointsDistances = %s with length %d, shape %s" % (
last10PointsDistances.to_numpy(),
len(last10PointsDistances),
last10PointsDistances.shape
))

esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/loop",
time.time(),
t_loop.elapsed
)
# Fix for https://github.com/e-mission/e-mission-server/issues/348
last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1)

with ect.Timer() as t_post_loop:
logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" %
(just_ended, len(transition_df)))
if not just_ended and len(transition_df) > 0:
stopped_moving_after_last = transition_df[
(transition_df.ts > currPoint.ts) & (transition_df.transition == 2)
]
logging.debug("looking after %s, found transitions %s" %
(currPoint.ts, stopped_moving_after_last))
if len(stopped_moving_after_last) > 0:
(unused, last_trip_end_point) = self.get_last_trip_end_point(
logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" %
(len(last10PointsDistances), len(last5MinsDistances)))
logging.debug("last5MinTimes.max() = %s, time_threshold = %s" %
(last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold))



if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes):
(ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(
filtered_points_df,
last10Points_df,
None
last5MinsPoints_df
)
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time)
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts
if ended_before_this:
# in this case, we end a trip at the previous point, and the next trip starts at this
# point, not the next one
just_ended = False
prevPoint = currPoint
curr_trip_start_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" %
(currPoint, currPoint.idx))
else:
# We end a trip at the current point, and the next trip starts at the next point
just_ended = True
prevPoint = None
else:
prevPoint = currPoint


esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/post_loop",
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/loop",
time.time(),
t_post_loop.elapsed
t_loop.elapsed
)

logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" %
(just_ended, len(transition_df)))
if not just_ended and len(transition_df) > 0:
stopped_moving_after_last = transition_df[
(transition_df.ts > currPoint.ts) & (transition_df.transition == 2)
]
logging.debug("looking after %s, found transitions %s" %
(currPoint.ts, stopped_moving_after_last))
if len(stopped_moving_after_last) > 0:
(unused, last_trip_end_point) = self.get_last_trip_end_point(
filtered_points_df,
last10Points_df,
None
)
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts

return segmentation_points

def continue_just_ended(self, idx, currPoint, filtered_points_df):